diff --git a/integrations/grafana/m3aggregator_end_to_end_details.json b/integrations/grafana/m3aggregator_end_to_end_details.json new file mode 100644 index 0000000000..4c0df4f9ee --- /dev/null +++ b/integrations/grafana/m3aggregator_end_to_end_details.json @@ -0,0 +1,2011 @@ +{ + "annotations": { + "list": [ + { + "builtIn": 1, + "datasource": "-- Grafana --", + "enable": true, + "hide": true, + "iconColor": "rgba(0, 211, 255, 1)", + "name": "Annotations & Alerts", + "type": "dashboard" + } + ] + }, + "editable": true, + "gnetId": null, + "graphTooltip": 1, + "id": 32, + "iteration": 1582150012033, + "links": [], + "panels": [ + { + "collapsed": false, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 0 + }, + "id": 196, + "panels": [], + "title": "Send to Aggregator", + "type": "row" + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 6, + "x": 0, + "y": 1 + }, + "id": 198, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null", + "options": { + "dataLinks": [] + }, + "percentage": false, + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "sum(rate(coordinator_downsampler_remote_aggregator_client_writeForwarded_success[$step]))", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Write Timed/Forwarded Success", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 6, + "x": 6, + "y": 1 + }, + "id": 199, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null", + "options": { + "dataLinks": [] + }, + "percentage": false, + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "sum(rate(coordinator_downsampler_remote_aggregator_client_writeForwarded_errors[$step]))", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Write Timed/Forwarded Errors", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 6, + "x": 12, + "y": 1 + }, + "id": 200, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null", + "options": { + "dataLinks": [] + }, + "percentage": false, + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "sum(rate(coordinator_downsampler_remote_aggregator_client_writer_manager_writer_queue_successes[$step]))", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Write Queue Success", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 6, + "x": 18, + "y": 1 + }, + "id": 201, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null", + "options": { + "dataLinks": [] + }, + "percentage": false, + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "sum(rate(coordinator_downsampler_remote_aggregator_client_writer_manager_writer_queue_dropped[$step]))", + "refId": "A" + }, + { + "expr": "sum(rate(coordinator_downsampler_remote_aggregator_client_writer_manager_writer_queue_errors[$step]))", + "refId": "B" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Write Queue Dropped/Errors", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "collapsed": false, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 9 + }, + "id": 194, + "panels": [], + "title": "Arriving at Aggregator", + "type": "row" + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "editable": true, + "error": false, + "fill": 0, + "fillGradient": 0, + "grid": {}, + "gridPos": { + "h": 7, + "w": 8, + "x": 0, + "y": 10 + }, + "id": 4, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "connected", + "options": { + "dataLinks": [] + }, + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "sum(rate(m3aggregator_aggregator_addUntimed_success{m3_cluster=~\"$cluster\"}[$step]))", + "key": 0.32458788643639513, + "legendFormat": "add untimed success", + "refId": "A" + }, + { + "expr": "sum(rate(m3aggregator_aggregator_addForwarded_success{m3_cluster=~\"$cluster\"}[$step]))", + "hide": false, + "key": 0.32458788643639513, + "legendFormat": "add forwarded success", + "refId": "C" + }, + { + "expr": "sum(rate(m3aggregator_aggregator_addTimed_success{m3_cluster=~\"$cluster\"}[$step]))", + "hide": false, + "key": 0.32458788643639513, + "legendFormat": "add timed success", + "refId": "B" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Writes Success / s", + "tooltip": { + "shared": false, + "sort": 0, + "value_type": "cumulative" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "short", + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "percentunit", + "label": "", + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "editable": true, + "error": false, + "fill": 0, + "fillGradient": 0, + "grid": {}, + "gridPos": { + "h": 7, + "w": 8, + "x": 8, + "y": 10 + }, + "id": 12, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "connected", + "options": { + "dataLinks": [] + }, + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "sum(rate(m3aggregator_aggregator_addUntimed_errors{m3_cluster=~\"$cluster\"}[$step])) by (reason)", + "key": 0.015375561225046352, + "legendFormat": "{{reason}}", + "refId": "D" + }, + { + "expr": "sum(rate(m3aggregator_aggregator_addForwarded_errors{m3_cluster=~\"$cluster\"}[$step])) by (reason)", + "hide": false, + "key": 0.015375561225046352, + "legendFormat": "{{reason}}", + "refId": "A" + }, + { + "expr": "sum(rate(m3aggregator_rawtcp_server_unknown_message_type_errors{m3_cluster=~\"$cluster\"}[$step]))", + "hide": false, + "key": 0.9314139646772208, + "legendFormat": "unknown message type errors", + "refId": "B" + }, + { + "expr": "sum(rate(m3aggregator_rawtcp_server_unknown_error_type_errors{m3_cluster=~\"$cluster\"}[$step]))", + "legendFormat": "unknown error type errors", + "refId": "E" + }, + { + "expr": "sum(rate(m3aggregator_rawtcp_server_decode_errors{m3_cluster=~\"$cluster\"}[$step]))", + "legendFormat": "server decode errors", + "refId": "F" + }, + { + "expr": "sum(rate(m3aggregator_aggregator_addTimed_errors{m3_cluster=~\"$cluster\"}[$step])) by (reason)", + "hide": false, + "key": 0.015375561225046352, + "legendFormat": "add timed error {{reason}}", + "refId": "C" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Write Errors / s", + "tooltip": { + "shared": false, + "sort": 0, + "value_type": "cumulative" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "short", + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "percentunit", + "label": "", + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "fill": 0, + "fillGradient": 0, + "gridPos": { + "h": 7, + "w": 8, + "x": 16, + "y": 10 + }, + "id": 171, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": false, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "options": { + "dataLinks": [] + }, + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "rate(m3aggregator_aggregator_flush_handler_drop_oldest_async{m3_cluster=~\"$cluster\"}[$step])", + "legendFormat": "dropped {{reason}}", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Metrics Dropped (All Consumers)", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "collapsed": false, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 17 + }, + "id": 191, + "panels": [], + "repeat": null, + "title": "Leaving Aggregator", + "type": "row" + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "fill": 0, + "fillGradient": 0, + "gridPos": { + "h": 7, + "w": 6, + "x": 0, + "y": 18 + }, + "id": 135, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": false, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "connected", + "options": { + "dataLinks": [] + }, + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "sum(m3aggregator_aggregator_flush_handler_message_buffered{m3_cluster=~\"$cluster\",backend=\"m3msg\",component=\"producer\"}) by (instance)", + "legendFormat": "{{instance}}", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Total Messages Buffered", + "tooltip": { + "shared": false, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 7, + "w": 6, + "x": 6, + "y": 18 + }, + "id": 126, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": false, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "options": { + "dataLinks": [] + }, + "percentage": false, + "pointradius": 5, + "points": true, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "m3aggregator_aggregator_flush_handler_buffer_message_dropped{m3_cluster=~\"$cluster\",backend=\"m3msg\", component=\"producer\"}", + "legendFormat": "{{instance}} {{__name__}}", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Message Dropped", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 7, + "w": 6, + "x": 12, + "y": 18 + }, + "id": 192, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": false, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "options": { + "dataLinks": [] + }, + "percentage": false, + "pointradius": 5, + "points": true, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "m3aggregator_aggregator_flush_handler_drop_oldest_async{m3_cluster=~\"$cluster\",backend=\"m3msg\",component=\"producer\"}", + "legendFormat": "{{instance}}", + "refId": "A" + }, + { + "expr": "m3aggregator_aggregator_flush_handler_drop_oldest_sync{m3_cluster=~\"$cluster\",backend=\"m3msg\",component=\"producer\"}", + "legendFormat": "{{instance}}", + "refId": "B" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Drop types", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "fill": 0, + "fillGradient": 0, + "gridPos": { + "h": 7, + "w": 6, + "x": 0, + "y": 25 + }, + "id": 124, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": false, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "connected", + "options": { + "dataLinks": [] + }, + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "sum(m3aggregator_aggregator_flush_handler_byte_buffered{m3_cluster=~\"$cluster\",backend=\"m3msg\", component=\"producer\"}) by (instance)", + "legendFormat": "{{instance}}", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Total Bytes buffered", + "tooltip": { + "shared": false, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "decbytes", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 7, + "w": 6, + "x": 6, + "y": 25 + }, + "id": 127, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": false, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "options": { + "dataLinks": [] + }, + "percentage": false, + "pointradius": 5, + "points": true, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "m3aggregator_aggregator_flush_handler_buffer_byte_dropped{m3_cluster=~\"$cluster\",backend=\"m3msg\",component=\"producer\"}", + "legendFormat": "{{instance}}", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Bytes Dropped", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "bytes", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "collapsed": false, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 32 + }, + "id": 203, + "panels": [], + "title": "Arriving at Coordinator", + "type": "row" + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 7, + "w": 6, + "x": 0, + "y": 33 + }, + "id": 205, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null", + "options": { + "dataLinks": [] + }, + "percentage": false, + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "sum(rate(coordinator_ingest_m3msg_ack_sent{m3_cluster=~\"$cluster\"}[$step]))", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Coordinator M3Msg Ingest Ack", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 7, + "w": 6, + "x": 6, + "y": 33 + }, + "id": 206, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null", + "options": { + "dataLinks": [] + }, + "percentage": false, + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "sum(rate(coordinator_ingest_m3msg_errors{m3_cluster=~\"$cluster\"}[$step]))", + "refId": "A" + }, + { + "expr": "sum(rate(coordinator_ingest_m3msg_ack_encode_error{m3_cluster=~\"$cluster\"}[$step]))", + "refId": "B" + }, + { + "expr": "sum(rate(coordinator_ingest_m3msg_ack_write_error{m3_cluster=~\"$cluster\"}[$step]))", + "refId": "C" + }, + { + "expr": "sum(rate(coordinator_ingest_m3msg_errors_final{m3_cluster=~\"$cluster\"}[$step]))", + "refId": "D" + }, + { + "expr": "sum(rate(coordinator_ingest_m3msg_message_decode_error{m3_cluster=~\"$cluster\"}[$step]))", + "refId": "E" + }, + { + "expr": "sum(rate(coordinator_ingest_m3msg_message_read_error{m3_cluster=~\"$cluster\"}[$step]))", + "refId": "F" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Coordinator M3Msg Ingest Errors", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 7, + "w": 6, + "x": 12, + "y": 33 + }, + "id": 207, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null", + "options": { + "dataLinks": [] + }, + "percentage": false, + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "sum(rate(coordinator_ingest_m3msg_metric_accepted{m3_cluster=~\"$cluster\"}[$step]))", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Coordinator M3Msg Ingest Accepted", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 7, + "w": 6, + "x": 18, + "y": 33 + }, + "id": 208, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null", + "options": { + "dataLinks": [] + }, + "percentage": false, + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "sum(rate(coordinator_ingest_m3msg_metric_dropped{m3_cluster=~\"$cluster\"}[$step]))", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Coordinator M3Msg Ingest Dropped", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 7, + "w": 6, + "x": 0, + "y": 40 + }, + "id": 210, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null", + "options": { + "dataLinks": [] + }, + "percentage": false, + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "sum(rate(coordinator_ingest_success{m3_cluster=~\"$cluster\"}[$step]))", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Coordinator M3Msg Ingest Success", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 7, + "w": 6, + "x": 6, + "y": 40 + }, + "id": 211, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null", + "options": { + "dataLinks": [] + }, + "percentage": false, + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "sum(rate(coordinator_ingest_error{m3_cluster=~\"$cluster\"}[$step]))", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Coordinator M3Msg Ingest Error", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + } + ], + "refresh": false, + "schemaVersion": 19, + "style": "dark", + "tags": [], + "templating": { + "list": [ + { + "allValue": null, + "current": { + "tags": [], + "text": "dev", + "value": "dev" + }, + "definition": "label_values(up,m3_cluster)", + "hide": 0, + "includeAll": false, + "label": null, + "multi": false, + "name": "cluster", + "options": [], + "query": "label_values(up,m3_cluster)", + "refresh": 1, + "regex": "", + "skipUrlSync": false, + "sort": 1, + "tagValuesQuery": "", + "tags": [], + "tagsQuery": "", + "type": "query", + "useTags": false + }, + { + "allFormat": "glob", + "allValue": ".*", + "current": { + "text": "All", + "value": "$__all" + }, + "definition": "{__name__=~\"m3aggregator_aggregator_flush_handler.*\"}", + "hide": 0, + "includeAll": true, + "label": null, + "multi": true, + "multiFormat": "glob", + "name": "backend", + "options": [], + "query": "{m3_cluster=~\"$cluster\",__name__=~\"m3aggregator_aggregator_flush_handler.*\"}", + "refresh": 1, + "regex": "/backend=\\\"([^\"]+)\\\".*/", + "skipUrlSync": false, + "sort": 0, + "tag": "name", + "tagValuesQuery": "", + "tags": [], + "tagsQuery": "", + "type": "query", + "useTags": false + }, + { + "allFormat": "glob", + "allValue": ".*", + "current": { + "selected": true, + "text": "All", + "value": "$__all" + }, + "hide": 0, + "includeAll": true, + "label": "", + "multi": true, + "multiFormat": "glob", + "name": "poolType", + "options": [ + { + "selected": true, + "text": "All", + "value": "$__all" + }, + { + "selected": false, + "text": "entry_pool", + "value": "entry_pool" + }, + { + "selected": false, + "text": "counter_elem_pool", + "value": "counter_elem_pool" + }, + { + "selected": false, + "text": "timer_elem_pool", + "value": "timer_elem_pool" + }, + { + "selected": false, + "text": "gauge_elem_pool", + "value": "gauge_elem_pool" + }, + { + "selected": false, + "text": "buffered_encoder_pool", + "value": "buffered_encoder_pool" + }, + { + "selected": false, + "text": "sample_pool", + "value": "sample_pool" + }, + { + "selected": false, + "text": "floats_pool", + "value": "floats_pool" + }, + { + "selected": false, + "text": "stream_pool", + "value": "stream_pool" + }, + { + "selected": false, + "text": "large_floats_pool", + "value": "large_floats_pool" + }, + { + "selected": false, + "text": "unaggregated_iterator_pool", + "value": "unaggregated_iterator_pool" + }, + { + "selected": false, + "text": "aggregation_types_pool", + "value": "aggregation_types_pool" + }, + { + "selected": false, + "text": "quantile_pool", + "value": "quantile_pool" + }, + { + "selected": false, + "text": "bytes_pool", + "value": "bytes_pool" + } + ], + "query": "entry_pool,counter_elem_pool,timer_elem_pool,gauge_elem_pool,buffered_encoder_pool,sample_pool,floats_pool,stream_pool,large_floats_pool,unaggregated_iterator_pool,aggregation_types_pool,quantile_pool,bytes_pool", + "refresh": 1, + "regex": "instance:(.*)", + "skipUrlSync": false, + "tag": "instance", + "type": "custom" + }, + { + "allFormat": "glob", + "allValue": ".*", + "current": { + "selected": true, + "text": "All", + "value": "{total,free,get-on-empty,put-on-full}" + }, + "hide": 0, + "includeAll": true, + "label": "", + "multi": true, + "multiFormat": "glob", + "name": "poolMetricType", + "options": [ + { + "selected": true, + "text": "All", + "value": "$__all" + }, + { + "selected": false, + "text": "total", + "value": "total" + }, + { + "selected": false, + "text": "free", + "value": "free" + }, + { + "selected": false, + "text": "get_on_empty", + "value": "get_on_empty" + }, + { + "selected": false, + "text": "put_on_full", + "value": "put_on_full" + } + ], + "query": "total,free,get_on_empty,put_on_full", + "refresh": 1, + "regex": "instance:(.*)", + "skipUrlSync": false, + "tag": "instance", + "type": "custom" + }, + { + "allFormat": "glob", + "allValue": ".*", + "current": { + "text": "All", + "value": "$__all" + }, + "definition": "m3aggregator_aggregator_flush_handler_placement_update{m3_cluster=~\"$cluster\",backend=\"m3msg\"}", + "hide": 0, + "includeAll": true, + "label": null, + "multi": true, + "multiFormat": "glob", + "name": "m3msg_consumer_service", + "options": [], + "query": "m3aggregator_aggregator_flush_handler_placement_update{m3_cluster=~\"$cluster\",backend=\"m3msg\"}", + "refresh": 1, + "regex": "/consumer_service_name=\"([^\"]+)\"/", + "skipUrlSync": false, + "sort": 0, + "tag": "name", + "tagValuesQuery": "", + "tags": [], + "tagsQuery": "", + "type": "query", + "useTags": false + }, + { + "allFormat": "glob", + "allValue": ".*", + "current": { + "text": "All", + "value": "$__all" + }, + "definition": "m3aggregator_aggregator_flush_handler_placement_update{m3_cluster=~\"$cluster\",backend=\"m3msg\"}", + "hide": 0, + "includeAll": true, + "label": null, + "multi": true, + "multiFormat": "glob", + "name": "m3msg_consumer_env", + "options": [], + "query": "m3aggregator_aggregator_flush_handler_placement_update{m3_cluster=~\"$cluster\",backend=\"m3msg\"}", + "refresh": 1, + "regex": "/consumer_service_env=\"([^\"]+)\"/", + "skipUrlSync": false, + "sort": 0, + "tag": "name", + "tagValuesQuery": "", + "tags": [], + "tagsQuery": "", + "type": "query", + "useTags": false + }, + { + "allValue": null, + "current": { + "tags": [], + "text": "1m", + "value": "1m" + }, + "hide": 0, + "includeAll": false, + "label": null, + "multi": false, + "name": "step", + "options": [ + { + "selected": false, + "text": "30s", + "value": "30s" + }, + { + "selected": true, + "text": "1m", + "value": "1m" + }, + { + "selected": false, + "text": "5m", + "value": "5m" + }, + { + "selected": false, + "text": "10m", + "value": "10m" + } + ], + "query": "30s,1m,5m,10m", + "skipUrlSync": false, + "type": "custom" + } + ] + }, + "time": { + "from": "now-1h", + "to": "now" + }, + "timepicker": { + "now": true, + "refresh_intervals": [ + "5s", + "10s", + "30s", + "1m", + "5m", + "15m", + "30m", + "1h", + "2h", + "1d" + ], + "time_options": [ + "5m", + "15m", + "1h", + "6h", + "12h", + "24h", + "2d", + "7d", + "30d" + ] + }, + "timezone": "browser", + "title": "M3: Aggregator End-to-End Details", + "uid": "sqQiAbQZz", + "version": 9 +} \ No newline at end of file diff --git a/scripts/docker-integration-tests/aggregator/docker-compose.yml b/scripts/docker-integration-tests/aggregator/docker-compose.yml index 63cd26c375..c93b41ee25 100644 --- a/scripts/docker-integration-tests/aggregator/docker-compose.yml +++ b/scripts/docker-integration-tests/aggregator/docker-compose.yml @@ -19,6 +19,7 @@ services: - "7204" ports: - "0.0.0.0:7202:7202" + - "0.0.0.0:7203:7203" - "0.0.0.0:7204:7204" networks: - backend diff --git a/scripts/docker-integration-tests/aggregator/m3aggregator.yml b/scripts/docker-integration-tests/aggregator/m3aggregator.yml index 49a13a770b..58d0370d4e 100644 --- a/scripts/docker-integration-tests/aggregator/m3aggregator.yml +++ b/scripts/docker-integration-tests/aggregator/m3aggregator.yml @@ -71,7 +71,7 @@ rawtcp: kvClient: etcd: - env: default_env + env: override_test_env zone: embedded service: m3aggregator cacheDir: /var/lib/m3kv @@ -82,7 +82,7 @@ kvClient: runtimeOptions: kvConfig: - environment: default_env + environment: override_test_env zone: embedded writeValuesPerMetricLimitPerSecondKey: write-values-per-metric-limit-per-second writeValuesPerMetricLimitPerSecond: 0 @@ -131,7 +131,7 @@ aggregator: placementKV: namespace: /placement zone: embedded - environment: default_env + environment: override_test_env placementWatcher: key: m3aggregator initWatchTimeout: 15s @@ -164,7 +164,7 @@ aggregator: placementManager: kvConfig: namespace: /placement - environment: default_env + environment: override_test_env zone: embedded placementWatcher: key: m3aggregator @@ -175,7 +175,7 @@ aggregator: resignTimeout: 1m flushTimesManager: kvConfig: - environment: default_env + environment: override_test_env zone: embedded flushTimesKeyFmt: shardset/%d/flush flushTimesPersistRetrier: @@ -190,7 +190,7 @@ aggregator: ttlSeconds: 10 serviceID: name: m3aggregator - environment: default_env + environment: override_test_env zone: embedded electionKeyFmt: shardset/%d/lock campaignRetrier: @@ -243,7 +243,7 @@ aggregator: topicName: aggregated_metrics topicServiceOverride: zone: embedded - environment: default_env + environment: override_test_env messageRetry: initialBackoff: 1m maxBackoff: 2m diff --git a/scripts/docker-integration-tests/aggregator/m3coordinator.yml b/scripts/docker-integration-tests/aggregator/m3coordinator.yml index 2b1c9b68b7..475afe191f 100644 --- a/scripts/docker-integration-tests/aggregator/m3coordinator.yml +++ b/scripts/docker-integration-tests/aggregator/m3coordinator.yml @@ -56,6 +56,7 @@ downsample: client: placementKV: namespace: /placement + environment: override_test_env placementWatcher: key: m3aggregator initWatchTimeout: 10s diff --git a/scripts/docker-integration-tests/aggregator/test.sh b/scripts/docker-integration-tests/aggregator/test.sh index 32ce73f9e2..e0b8d7ce13 100755 --- a/scripts/docker-integration-tests/aggregator/test.sh +++ b/scripts/docker-integration-tests/aggregator/test.sh @@ -25,7 +25,7 @@ echo "Setup DB node" setup_single_m3db_node echo "Initializing aggregator topology" -curl -vvvsSf -X POST localhost:7201/api/v1/services/m3aggregator/placement/init -d '{ +curl -vvvsSf -X POST -H "Cluster-Environment-Name: override_test_env" localhost:7201/api/v1/services/m3aggregator/placement/init -d '{ "num_shards": 64, "replication_factor": 2, "instances": [ @@ -51,7 +51,7 @@ curl -vvvsSf -X POST localhost:7201/api/v1/services/m3aggregator/placement/init }' echo "Initializing m3msg topic for m3coordinator ingestion from m3aggregators" -curl -vvvsSf -X POST localhost:7201/api/v1/topic/init -d '{ +curl -vvvsSf -X POST -H "Cluster-Environment-Name: override_test_env" localhost:7201/api/v1/topic/init -d '{ "numberOfShards": 64 }' @@ -75,7 +75,7 @@ echo "Done validating topology" # Do this after placement for m3coordinator is created. echo "Adding m3coordinator as a consumer to the aggregator topic" -curl -vvvsSf -X POST localhost:7201/api/v1/topic -d '{ +curl -vvvsSf -X POST -H "Cluster-Environment-Name: override_test_env" localhost:7201/api/v1/topic -d '{ "consumerService": { "serviceId": { "name": "m3coordinator", diff --git a/scripts/docker-integration-tests/run.sh b/scripts/docker-integration-tests/run.sh index c2ce9e9e08..3bbdbc5f12 100755 --- a/scripts/docker-integration-tests/run.sh +++ b/scripts/docker-integration-tests/run.sh @@ -17,6 +17,29 @@ TESTS=( scripts/docker-integration-tests/coordinator_config_rules/test.sh ) +# Some systems, including our default Buildkite hosts, don't come with netcat +# installed and we may not have perms to install it. "Install" it in the worst +# possible way. +if ! command -v nc && [[ "$BUILDKITE" == "true" ]]; then + echo "installing netcat" + NCDIR="$(mktemp -d)" + + yumdownloader --destdir "$NCDIR" --resolve nc + ( + cd "$NCDIR" + RPM=$(find . -maxdepth 1 -name '*.rpm' | tail -n1) + rpm2cpio "$RPM" | cpio -id + ) + + export PATH="$PATH:$NCDIR/usr/bin" + + function cleanup_nc() { + rm -rf "$NCDIR" + } + + trap cleanup_nc EXIT +fi + scripts/docker-integration-tests/setup.sh NUM_TESTS=${#TESTS[@]} diff --git a/src/aggregator/client/config.go b/src/aggregator/client/config.go index 8f69a5bc20..d99a553862 100644 --- a/src/aggregator/client/config.go +++ b/src/aggregator/client/config.go @@ -45,6 +45,7 @@ type Configuration struct { ShardCutoffLingerDuration *time.Duration `yaml:"shardCutoffLingerDuration"` Encoder EncoderConfiguration `yaml:"encoder"` FlushSize int `yaml:"flushSize"` + MaxBatchSize int `yaml:"maxBatchSize"` MaxTimerBatchSize int `yaml:"maxTimerBatchSize"` QueueSize int `yaml:"queueSize"` QueueDropType *DropType `yaml:"queueDropType"` @@ -127,6 +128,9 @@ func (c *Configuration) newClientOptions( if c.FlushSize != 0 { opts = opts.SetFlushSize(c.FlushSize) } + if c.MaxBatchSize != 0 { + opts = opts.SetMaxBatchSize(c.MaxBatchSize) + } if c.MaxTimerBatchSize != 0 { opts = opts.SetMaxTimerBatchSize(c.MaxTimerBatchSize) } diff --git a/src/aggregator/client/options.go b/src/aggregator/client/options.go index 40c635df61..14c338bc45 100644 --- a/src/aggregator/client/options.go +++ b/src/aggregator/client/options.go @@ -37,7 +37,9 @@ const ( // By default there is no limit on the timer batch size. defaultMaxTimerBatchSize = 0 - defaultInstanceQueueSize = 4096 + // defaultInstanceQueueSize determines how many metrics can be buffered + // before it must wait for an existing batch to be flushed to an instance. + defaultInstanceQueueSize = 2 << 15 // ~65k // By default traffic is cut over to shards 10 minutes before the designated // cutover time in case there are issues with the instances owning the shards. @@ -51,10 +53,10 @@ const ( // By default the oldest metrics in the queue are dropped when it is full. defaultDropType = DropOldest - // By default set maximum batch size to 32k - defaultMaxBatchSize = 2 << 14 + // By default set maximum batch size to 8mb. + defaultMaxBatchSize = 2 << 22 - // By default write at least every 100ms + // By default write at least every 100ms. defaultBatchFlushDeadline = 100 * time.Millisecond ) diff --git a/src/dbnode/integration/admin_session_fetch_blocks_test.go b/src/dbnode/integration/admin_session_fetch_blocks_test.go index 1332da05d0..0b64debbc1 100644 --- a/src/dbnode/integration/admin_session_fetch_blocks_test.go +++ b/src/dbnode/integration/admin_session_fetch_blocks_test.go @@ -27,8 +27,8 @@ import ( "time" "github.com/m3db/m3/src/dbnode/integration/generate" - "github.com/m3db/m3/src/dbnode/storage/block" "github.com/m3db/m3/src/dbnode/namespace" + "github.com/m3db/m3/src/dbnode/storage/block" "github.com/m3db/m3/src/dbnode/topology" "github.com/m3db/m3/src/x/context" "github.com/m3db/m3/src/x/ident" @@ -123,6 +123,32 @@ func testSetupMetadatas( return metadatasByShard } +func filterSeriesByShard( + testSetup *testSetup, + seriesMap map[xtime.UnixNano]generate.SeriesBlock, + desiredShards []uint32, +) map[xtime.UnixNano]generate.SeriesBlock { + filteredMap := make(map[xtime.UnixNano]generate.SeriesBlock) + for blockStart, series := range seriesMap { + filteredSeries := make([]generate.Series, 0, len(series)) + for _, serie := range series { + shard := testSetup.shardSet.Lookup(serie.ID) + for _, ss := range desiredShards { + if ss == shard { + filteredSeries = append(filteredSeries, serie) + break + } + } + } + + if len(filteredSeries) > 0 { + filteredMap[blockStart] = filteredSeries + } + } + + return filteredMap +} + func verifySeriesMapsEqual( t *testing.T, expectedSeriesMap map[xtime.UnixNano]generate.SeriesBlock, diff --git a/src/dbnode/integration/commitlog_bootstrap_unowned_shard_test.go b/src/dbnode/integration/commitlog_bootstrap_unowned_shard_test.go new file mode 100644 index 0000000000..e815ee1c5a --- /dev/null +++ b/src/dbnode/integration/commitlog_bootstrap_unowned_shard_test.go @@ -0,0 +1,145 @@ +// +build integration + +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package integration + +import ( + "fmt" + "testing" + "time" + + "github.com/m3db/m3/src/cluster/services" + "github.com/m3db/m3/src/cluster/shard" + "github.com/m3db/m3/src/dbnode/integration/fake" + "github.com/m3db/m3/src/dbnode/namespace" + "github.com/m3db/m3/src/dbnode/retention" + "github.com/m3db/m3/src/dbnode/topology" + xtest "github.com/m3db/m3/src/x/test" + + "github.com/stretchr/testify/require" +) + +func TestCommitLogBootstrapUnownedShard(t *testing.T) { + if testing.Short() { + t.SkipNow() + } + + log := xtest.NewLogger(t) + retentionOpts := retention.NewOptions(). + SetRetentionPeriod(20 * time.Hour). + SetBlockSize(2 * time.Hour). + SetBufferPast(10 * time.Minute). + SetBufferFuture(10 * time.Minute) + blockSize := retentionOpts.BlockSize() + + ns1, err := namespace.NewMetadata(testNamespaces[0], namespace.NewOptions(). + SetRetentionOptions(retentionOpts)) + require.NoError(t, err) + numShards := 6 + + // Helper function to create node instances for fake cluster service. + node := func(index int, shards []uint32) services.ServiceInstance { + id := fmt.Sprintf("testhost%d", index) + endpoint := fmt.Sprintf("127.0.0.1:%d", multiAddrPortStart+(index*multiAddrPortEach)) + + result := services.NewServiceInstance(). + SetInstanceID(id). + SetEndpoint(endpoint) + resultShards := make([]shard.Shard, len(shards)) + for i, id := range shards { + resultShards[i] = shard.NewShard(id).SetState(shard.Available) + } + return result.SetShards(shard.NewShards(resultShards)) + } + + // Pretend there are two nodes sharing 6 shards (RF1). + node0OwnedShards := []uint32{0, 1, 2} + svc := fake.NewM3ClusterService(). + SetInstances([]services.ServiceInstance{ + node(0, node0OwnedShards), + node(1, []uint32{3, 4, 5}), + }). + SetReplication(services.NewServiceReplication().SetReplicas(1)). + SetSharding(services.NewServiceSharding().SetNumShards(numShards)) + svcs := fake.NewM3ClusterServices() + svcs.RegisterService("m3db", svc) + topoOpts := topology.NewDynamicOptions(). + SetConfigServiceClient(fake.NewM3ClusterClient(svcs, nil)) + topoInit := topology.NewDynamicInitializer(topoOpts) + + opts := newTestOptions(t). + SetNamespaces([]namespace.Metadata{ns1}). + SetNumShards(numShards) + setupOpts := []bootstrappableTestSetupOptions{ + {disablePeersBootstrapper: true, topologyInitializer: topoInit}, + {disablePeersBootstrapper: true, topologyInitializer: topoInit}, + } + + setups, closeFn := newDefaultBootstrappableTestSetups(t, opts, setupOpts) + defer closeFn() + + // Only set this up for the first setup because we're only writing commit + // logs for the first server. + setup := setups[0] + commitLogOpts := setup.storageOpts.CommitLogOptions(). + SetFlushInterval(defaultIntegrationTestFlushInterval) + setup.storageOpts = setup.storageOpts.SetCommitLogOptions(commitLogOpts) + + log.Info("generating data") + now := setup.getNowFn() + seriesMaps := generateSeriesMaps(30, nil, now.Add(-2*blockSize), now.Add(-blockSize)) + log.Info("writing data") + // Write commit log with generated data that spreads across all shards + // (including shards that this node should not own). This node should still + // be able to bootstrap successfully with commit log entries from shards + // that it does not own. + writeCommitLogData(t, setup, commitLogOpts, seriesMaps, ns1, false) + log.Info("finished writing data") + + // Setup bootstrapper after writing data so filesystem inspection can find it. + setupCommitLogBootstrapperWithFSInspection(t, setup, commitLogOpts) + + // Start the servers. + for _, setup := range setups { + require.NoError(t, setup.startServer()) + } + + // Defer stop the servers. + defer func() { + setups.parallel(func(s *testSetup) { + require.NoError(t, s.stopServer()) + }) + log.Debug("servers are now down") + }() + + // Only fetch blocks for shards owned by node 0. + metadatasByShard, err := m3dbClientFetchBlocksMetadata( + setup.m3dbVerificationAdminClient, testNamespaces[0], node0OwnedShards, + now.Add(-2*blockSize), now, topology.ReadConsistencyLevelMajority) + require.NoError(t, err) + + observedSeriesMaps := testSetupToSeriesMaps(t, setup, ns1, metadatasByShard) + // Filter out the written series that node 0 does not own. + filteredSeriesMaps := filterSeriesByShard(setup, seriesMaps, node0OwnedShards) + // Expect to only see data that node 0 owns. + verifySeriesMapsEqual(t, filteredSeriesMaps, observedSeriesMaps) +} diff --git a/src/dbnode/server/server.go b/src/dbnode/server/server.go index c09c410a4f..aa6cede467 100644 --- a/src/dbnode/server/server.go +++ b/src/dbnode/server/server.go @@ -1225,6 +1225,7 @@ func withEncodingAndPoolingOptions( SetInstrumentOptions(iopts.SetMetricsScope(scope.SubScope("bytes-pool"))) checkedBytesPoolOpts := bytesPoolOpts. SetInstrumentOptions(iopts.SetMetricsScope(scope.SubScope("checked-bytes-pool"))) + buckets := make([]pool.Bucket, len(policy.BytesPool.Buckets)) for i, bucket := range policy.BytesPool.Buckets { var b pool.Bucket @@ -1234,10 +1235,12 @@ func withEncodingAndPoolingOptions( SetRefillLowWatermark(bucket.RefillLowWaterMarkOrDefault()). SetRefillHighWatermark(bucket.RefillHighWaterMarkOrDefault()) buckets[i] = b - logger.Sugar().Infof("bytes pool registering bucket capacity=%d, size=%d, "+ - "refillLowWatermark=%f, refillHighWatermark=%f", - bucket.Capacity, bucket.Size, - bucket.RefillLowWaterMarkOrDefault(), bucket.RefillHighWaterMarkOrDefault()) + + logger.Info("bytes pool configured", + zap.Int("capacity", bucket.CapacityOrDefault()), + zap.Int("size", bucket.SizeOrDefault()), + zap.Float64("refillLowWaterMark", bucket.RefillLowWaterMarkOrDefault()), + zap.Float64("refillHighWaterMark", bucket.RefillHighWaterMarkOrDefault())) } var bytesPool pool.CheckedBytesPool @@ -1260,9 +1263,9 @@ func withEncodingAndPoolingOptions( l = l.With(zap.String("policy", string(*t))) } - l.Info("bytes pool init") + l.Info("bytes pool init start") bytesPool.Init() - l.Info("bytes pool init done") + l.Info("bytes pool init end") } segmentReaderPool := xio.NewSegmentReaderPool( diff --git a/src/dbnode/storage/bootstrap.go b/src/dbnode/storage/bootstrap.go index 9834ee805c..9c2d3047f8 100644 --- a/src/dbnode/storage/bootstrap.go +++ b/src/dbnode/storage/bootstrap.go @@ -82,6 +82,7 @@ type bootstrapManager struct { state BootstrapState hasPending bool status tally.Gauge + bootstrapDuration tally.Timer lastBootstrapCompletionTime time.Time } @@ -92,14 +93,15 @@ func newBootstrapManager( ) databaseBootstrapManager { scope := opts.InstrumentOptions().MetricsScope() m := &bootstrapManager{ - database: database, - mediator: mediator, - opts: opts, - log: opts.InstrumentOptions().Logger(), - nowFn: opts.ClockOptions().NowFn(), - sleepFn: time.Sleep, - processProvider: opts.BootstrapProcessProvider(), - status: scope.Gauge("bootstrapped"), + database: database, + mediator: mediator, + opts: opts, + log: opts.InstrumentOptions().Logger(), + nowFn: opts.ClockOptions().NowFn(), + sleepFn: time.Sleep, + processProvider: opts.BootstrapProcessProvider(), + status: scope.Gauge("bootstrapped"), + bootstrapDuration: scope.Timer("bootstrap-duration"), } m.bootstrapFn = m.bootstrap return m @@ -330,8 +332,10 @@ func (m *bootstrapManager) bootstrap() error { // Run the bootstrap. bootstrapResult, err := process.Run(start, targets) + bootstrapDuration := m.nowFn().Sub(start) + m.bootstrapDuration.Record(bootstrapDuration) logFields = append(logFields, - zap.Duration("bootstrapDuration", m.nowFn().Sub(start))) + zap.Duration("bootstrapDuration", bootstrapDuration)) if err != nil { m.log.Error("bootstrap failed", diff --git a/src/dbnode/storage/bootstrap/bootstrap_mock.go b/src/dbnode/storage/bootstrap/bootstrap_mock.go index b5a804b713..374b951130 100644 --- a/src/dbnode/storage/bootstrap/bootstrap_mock.go +++ b/src/dbnode/storage/bootstrap/bootstrap_mock.go @@ -1,7 +1,7 @@ // Code generated by MockGen. DO NOT EDIT. // Source: github.com/m3db/m3/src/dbnode/storage/bootstrap/types.go -// Copyright (c) 2019 Uber Technologies, Inc. +// Copyright (c) 2020 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal @@ -162,12 +162,13 @@ func (m *MockNamespaceDataAccumulator) EXPECT() *MockNamespaceDataAccumulatorMoc } // CheckoutSeriesWithoutLock mocks base method -func (m *MockNamespaceDataAccumulator) CheckoutSeriesWithoutLock(shardID uint32, id ident.ID, tags ident.TagIterator) (CheckoutSeriesResult, error) { +func (m *MockNamespaceDataAccumulator) CheckoutSeriesWithoutLock(shardID uint32, id ident.ID, tags ident.TagIterator) (CheckoutSeriesResult, bool, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "CheckoutSeriesWithoutLock", shardID, id, tags) ret0, _ := ret[0].(CheckoutSeriesResult) - ret1, _ := ret[1].(error) - return ret0, ret1 + ret1, _ := ret[1].(bool) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 } // CheckoutSeriesWithoutLock indicates an expected call of CheckoutSeriesWithoutLock @@ -177,12 +178,13 @@ func (mr *MockNamespaceDataAccumulatorMockRecorder) CheckoutSeriesWithoutLock(sh } // CheckoutSeriesWithLock mocks base method -func (m *MockNamespaceDataAccumulator) CheckoutSeriesWithLock(shardID uint32, id ident.ID, tags ident.TagIterator) (CheckoutSeriesResult, error) { +func (m *MockNamespaceDataAccumulator) CheckoutSeriesWithLock(shardID uint32, id ident.ID, tags ident.TagIterator) (CheckoutSeriesResult, bool, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "CheckoutSeriesWithLock", shardID, id, tags) ret0, _ := ret[0].(CheckoutSeriesResult) - ret1, _ := ret[1].(error) - return ret0, ret1 + ret1, _ := ret[1].(bool) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 } // CheckoutSeriesWithLock indicates an expected call of CheckoutSeriesWithLock diff --git a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go index fb8097e770..6f0ab39023 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go @@ -91,8 +91,9 @@ type seriesMapKey struct { } type seriesMapEntry struct { - namespace *bootstrapNamespace - series bootstrap.CheckoutSeriesResult + shardNoLongerOwned bool + namespace *bootstrapNamespace + series bootstrap.CheckoutSeriesResult } // accumulateArg contains all the information a worker go-routine needs to @@ -289,6 +290,7 @@ func (s *commitLogSource) Read( } datapointsSkippedNotBootstrappingNamespace = 0 datapointsSkippedNotBootstrappingShard = 0 + datapointsSkippedShardNoLongerOwned = 0 startCommitLogsRead = s.nowFn() ) s.log.Info("read commit logs start") @@ -301,7 +303,8 @@ func (s *commitLogSource) Read( zap.Stringer("took", s.nowFn().Sub(startCommitLogsRead)), zap.Int("datapointsRead", datapointsRead), zap.Int("datapointsSkippedNotBootstrappingNamespace", datapointsSkippedNotBootstrappingNamespace), - zap.Int("datapointsSkippedNotBootstrappingShard", datapointsSkippedNotBootstrappingShard)) + zap.Int("datapointsSkippedNotBootstrappingShard", datapointsSkippedNotBootstrappingShard), + zap.Int("datapointsSkippedShardNoLongerOwned", datapointsSkippedShardNoLongerOwned)) }() iter, corruptFiles, err := s.newIteratorFn(iterOpts) @@ -450,13 +453,19 @@ func (s *commitLogSource) Read( // Check out the series for writing, no need for concurrency // as commit log bootstrapper does not perform parallel // checking out of series. - series, err := accumulator.CheckoutSeriesWithoutLock( + series, owned, err := accumulator.CheckoutSeriesWithoutLock( entry.Series.Shard, entry.Series.ID, - tagIter, - ) - + tagIter) if err != nil { + if !owned { + // If we encounter a log entry for a shard that we're + // not responsible for, skip this entry. This can occur + // when a topology change happens and we bootstrap from + // a commit log which contains this data. + commitLogSeries[seriesKey] = seriesMapEntry{shardNoLongerOwned: true} + continue + } return bootstrap.NamespaceResults{}, err } @@ -469,11 +478,19 @@ func (s *commitLogSource) Read( commitLogSeries[seriesKey] = seriesEntry } + // If series is no longer owned, then we can safely skip trying to + // bootstrap the result. + if seriesEntry.shardNoLongerOwned { + datapointsSkippedShardNoLongerOwned++ + continue + } + // If not bootstrapping this namespace then skip this result. if !seriesEntry.namespace.bootstrapping { datapointsSkippedNotBootstrappingNamespace++ continue } + // If not bootstrapping shard for this series then also skip. // NB(r): This can occur when a topology change happens then we // bootstrap from the commit log data that the node no longer owns. @@ -783,8 +800,12 @@ func (s *commitLogSource) bootstrapShardBlockSnapshot( } // NB(r): No parallelization required to checkout the series. - ref, err := accumulator.CheckoutSeriesWithoutLock(shard, id, tags) + ref, owned, err := accumulator.CheckoutSeriesWithoutLock(shard, id, tags) if err != nil { + if !owned { + // Skip bootstrapping this series if we don't own it. + continue + } return err } diff --git a/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go b/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go index 1f255b7d2a..d066de6735 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go @@ -584,8 +584,12 @@ func (s *fileSystemSource) readNextEntryAndRecordBlock( return fmt.Errorf("error reading data file: %v", err) } - ref, err := accumulator.CheckoutSeriesWithLock(shardID, id, tagsIter) + ref, owned, err := accumulator.CheckoutSeriesWithLock(shardID, id, tagsIter) if err != nil { + if !owned { + // Ignore if we no longer own the shard for this series. + return nil + } return fmt.Errorf("unable to checkout series: %v", err) } diff --git a/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go b/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go index 8a4aa1ac1c..e78f323595 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go @@ -396,8 +396,13 @@ func (s *peersSource) fetchBootstrapBlocksFromPeers( for _, elem := range shardResult.AllSeries().Iter() { entry := elem.Value() tagsIter.Reset(entry.Tags) - ref, err := accumulator.CheckoutSeriesWithLock(shard, entry.ID, tagsIter) + ref, owned, err := accumulator.CheckoutSeriesWithLock(shard, entry.ID, tagsIter) if err != nil { + if !owned { + // Only if we own this shard do we care consider this an + // error in bootstrapping. + continue + } unfulfill(currRange) s.log.Error("could not checkout series", zap.Error(err)) continue diff --git a/src/dbnode/storage/bootstrap/types.go b/src/dbnode/storage/bootstrap/types.go index fecced0898..2a42a4643f 100644 --- a/src/dbnode/storage/bootstrap/types.go +++ b/src/dbnode/storage/bootstrap/types.go @@ -226,8 +226,14 @@ type NamespaceDataAccumulator interface { // CheckoutSeriesWithoutLock retrieves a series for writing to // and when the accumulator is closed it will ensure that the // series is released. + // // If indexing is not enabled, tags is still required, simply pass // ident.EmptyTagIterator. + // + // Returns the result, whether the node owns the specified shard, along with + // an error if any. This allows callers to handle unowned shards differently + // than other errors. If owned == false, err should not be nil. + // // Note: Without lock variant does not perform any locking and callers // must ensure non-parallel access themselves, this helps avoid // overhead of the lock for the commit log bootstrapper which reads @@ -236,20 +242,17 @@ type NamespaceDataAccumulator interface { shardID uint32, id ident.ID, tags ident.TagIterator, - ) (CheckoutSeriesResult, error) + ) (result CheckoutSeriesResult, owned bool, err error) - // CheckoutSeriesWithLock retrieves a series for writing to - // and when the accumulator is closed it will ensure that the - // series is released. - // If indexing is not enabled, tags is still required, simply pass - // ident.EmptyTagIterator. - // Note: With lock variant perform locking and callers do not need + // CheckoutSeriesWithLock is the "with lock" version of + // CheckoutSeriesWithoutLock. + // Note: With lock variant performs locking and callers do not need // to be concerned about parallel access. CheckoutSeriesWithLock( shardID uint32, id ident.ID, tags ident.TagIterator, - ) (CheckoutSeriesResult, error) + ) (result CheckoutSeriesResult, owned bool, err error) // Close will close the data accumulator and will release // all series read/write refs. diff --git a/src/dbnode/storage/bootstrap/util.go b/src/dbnode/storage/bootstrap/util.go index d8578f18ed..f213f4bca3 100644 --- a/src/dbnode/storage/bootstrap/util.go +++ b/src/dbnode/storage/bootstrap/util.go @@ -186,7 +186,7 @@ func (a *TestDataAccumulator) CheckoutSeriesWithLock( shardID uint32, id ident.ID, tags ident.TagIterator, -) (CheckoutSeriesResult, error) { +) (CheckoutSeriesResult, bool, error) { a.Lock() defer a.Unlock() return a.checkoutSeriesWithLock(shardID, id, tags) @@ -199,7 +199,7 @@ func (a *TestDataAccumulator) CheckoutSeriesWithoutLock( shardID uint32, id ident.ID, tags ident.TagIterator, -) (CheckoutSeriesResult, error) { +) (CheckoutSeriesResult, bool, error) { return a.checkoutSeriesWithLock(shardID, id, tags) } @@ -207,7 +207,7 @@ func (a *TestDataAccumulator) checkoutSeriesWithLock( shardID uint32, id ident.ID, tags ident.TagIterator, -) (CheckoutSeriesResult, error) { +) (CheckoutSeriesResult, bool, error) { var decodedTags map[string]string if tags != nil { decodedTags = make(map[string]string, tags.Len()) @@ -221,7 +221,7 @@ func (a *TestDataAccumulator) checkoutSeriesWithLock( } if err := tags.Err(); err != nil { - return CheckoutSeriesResult{}, err + return CheckoutSeriesResult{}, false, err } } else { // Ensure the decoded tags aren't nil. @@ -230,7 +230,7 @@ func (a *TestDataAccumulator) checkoutSeriesWithLock( stringID := id.String() if result, found := a.results[stringID]; found { - return result, nil + return result, true, nil } var streamErr error @@ -286,7 +286,7 @@ func (a *TestDataAccumulator) checkoutSeriesWithLock( } a.results[stringID] = result - return result, streamErr + return result, true, streamErr } // Release is a no-op on the test accumulator. diff --git a/src/dbnode/storage/namespace.go b/src/dbnode/storage/namespace.go index e135696232..fc672976a3 100644 --- a/src/dbnode/storage/namespace.go +++ b/src/dbnode/storage/namespace.go @@ -672,18 +672,20 @@ func (n *dbNamespace) SeriesReadWriteRef( shardID uint32, id ident.ID, tags ident.TagIterator, -) (SeriesReadWriteRef, error) { +) (SeriesReadWriteRef, bool, error) { n.RLock() - shard, err := n.shardAtWithRLock(shardID) + shard, owned, err := n.shardAtWithRLock(shardID) n.RUnlock() if err != nil { - return SeriesReadWriteRef{}, err + return SeriesReadWriteRef{}, owned, err } opts := ShardSeriesReadWriteRefOptions{ ReverseIndex: n.reverseIndex != nil, } - return shard.SeriesReadWriteRef(id, tags, opts) + + res, err := shard.SeriesReadWriteRef(id, tags, opts) + return res, true, err } func (n *dbNamespace) QueryIDs( @@ -1371,7 +1373,7 @@ func (n *dbNamespace) shardFor(id ident.ID) (databaseShard, namespace.Context, e n.RLock() nsCtx := n.nsContextWithRLock() shardID := n.shardSet.Lookup(id) - shard, err := n.shardAtWithRLock(shardID) + shard, _, err := n.shardAtWithRLock(shardID) n.RUnlock() return shard, nsCtx, err } @@ -1393,23 +1395,23 @@ func (n *dbNamespace) readableShardAt(shardID uint32) (databaseShard, namespace. return shard, nsCtx, err } -func (n *dbNamespace) shardAtWithRLock(shardID uint32) (databaseShard, error) { +func (n *dbNamespace) shardAtWithRLock(shardID uint32) (databaseShard, bool, error) { // NB(r): These errors are retryable as they will occur // during a topology change and must be retried by the client. if int(shardID) >= len(n.shards) { - return nil, xerrors.NewRetryableError( + return nil, false, xerrors.NewRetryableError( fmt.Errorf("not responsible for shard %d", shardID)) } shard := n.shards[shardID] if shard == nil { - return nil, xerrors.NewRetryableError( + return nil, false, xerrors.NewRetryableError( fmt.Errorf("not responsible for shard %d", shardID)) } - return shard, nil + return shard, true, nil } func (n *dbNamespace) readableShardAtWithRLock(shardID uint32) (databaseShard, error) { - shard, err := n.shardAtWithRLock(shardID) + shard, _, err := n.shardAtWithRLock(shardID) if err != nil { return nil, err } @@ -1471,7 +1473,7 @@ func (n *dbNamespace) BootstrapState() ShardBootstrapStates { func (n *dbNamespace) FlushState(shardID uint32, blockStart time.Time) (fileOpState, error) { n.RLock() defer n.RUnlock() - shard, err := n.shardAtWithRLock(shardID) + shard, _, err := n.shardAtWithRLock(shardID) if err != nil { return fileOpState{}, err } diff --git a/src/dbnode/storage/namespace_bootstrap_data_accumulator.go b/src/dbnode/storage/namespace_bootstrap_data_accumulator.go index 418b268569..417eed9b9c 100644 --- a/src/dbnode/storage/namespace_bootstrap_data_accumulator.go +++ b/src/dbnode/storage/namespace_bootstrap_data_accumulator.go @@ -54,10 +54,10 @@ func (a *namespaceDataAccumulator) CheckoutSeriesWithoutLock( shardID uint32, id ident.ID, tags ident.TagIterator, -) (bootstrap.CheckoutSeriesResult, error) { - ref, err := a.namespace.SeriesReadWriteRef(shardID, id, tags) +) (bootstrap.CheckoutSeriesResult, bool, error) { + ref, owned, err := a.namespace.SeriesReadWriteRef(shardID, id, tags) if err != nil { - return bootstrap.CheckoutSeriesResult{}, err + return bootstrap.CheckoutSeriesResult{}, owned, err } a.needsRelease = append(a.needsRelease, ref.ReleaseReadWriteRef) @@ -65,18 +65,18 @@ func (a *namespaceDataAccumulator) CheckoutSeriesWithoutLock( Series: ref.Series, Shard: ref.Shard, UniqueIndex: ref.UniqueIndex, - }, nil + }, true, nil } func (a *namespaceDataAccumulator) CheckoutSeriesWithLock( shardID uint32, id ident.ID, tags ident.TagIterator, -) (bootstrap.CheckoutSeriesResult, error) { +) (bootstrap.CheckoutSeriesResult, bool, error) { a.Lock() - result, err := a.CheckoutSeriesWithoutLock(shardID, id, tags) + result, owned, err := a.CheckoutSeriesWithoutLock(shardID, id, tags) a.Unlock() - return result, err + return result, owned, err } func (a *namespaceDataAccumulator) Close() error { diff --git a/src/dbnode/storage/namespace_bootstrap_data_accumulator_test.go b/src/dbnode/storage/namespace_bootstrap_data_accumulator_test.go index 1764a5ba3e..ab24c1a2e8 100644 --- a/src/dbnode/storage/namespace_bootstrap_data_accumulator_test.go +++ b/src/dbnode/storage/namespace_bootstrap_data_accumulator_test.go @@ -56,7 +56,8 @@ func checkoutWithLock( id ident.ID, tags ident.TagIterator, ) (bootstrap.CheckoutSeriesResult, error) { - return acc.CheckoutSeriesWithLock(shardID, id, tags) + res, _, err := acc.CheckoutSeriesWithLock(shardID, id, tags) + return res, err } func checkoutWithoutLock( @@ -65,7 +66,8 @@ func checkoutWithoutLock( id ident.ID, tags ident.TagIterator, ) (bootstrap.CheckoutSeriesResult, error) { - return acc.CheckoutSeriesWithoutLock(shardID, id, tags) + res, _, err := acc.CheckoutSeriesWithoutLock(shardID, id, tags) + return res, err } func TestCheckoutSeries(t *testing.T) { @@ -94,9 +96,9 @@ func testCheckoutSeries(t *testing.T, checkoutFn checkoutFn) { } ) - ns.EXPECT().SeriesReadWriteRef(shardID, id, tagIter).Return(ref, nil) + ns.EXPECT().SeriesReadWriteRef(shardID, id, tagIter).Return(ref, true, nil) ns.EXPECT().SeriesReadWriteRef(shardID, idErr, tagIter). - Return(SeriesReadWriteRef{}, errors.New("err")) + Return(SeriesReadWriteRef{}, false, errors.New("err")) _, err := checkoutFn(acc, shardID, idErr, tagIter) require.Error(t, err) @@ -141,7 +143,7 @@ func testAccumulatorRelease(t *testing.T, checkoutFn checkoutFn) { } ) - ns.EXPECT().SeriesReadWriteRef(shardID, id, tagIter).Return(ref, nil) + ns.EXPECT().SeriesReadWriteRef(shardID, id, tagIter).Return(ref, true, nil) _, err = checkoutFn(acc, shardID, id, tagIter) require.NoError(t, err) diff --git a/src/dbnode/storage/storage_mock.go b/src/dbnode/storage/storage_mock.go index 63af25644b..ac700625b3 100644 --- a/src/dbnode/storage/storage_mock.go +++ b/src/dbnode/storage/storage_mock.go @@ -1461,12 +1461,13 @@ func (mr *MockdatabaseNamespaceMockRecorder) FlushState(shardID, blockStart inte } // SeriesReadWriteRef mocks base method -func (m *MockdatabaseNamespace) SeriesReadWriteRef(shardID uint32, id ident.ID, tags ident.TagIterator) (SeriesReadWriteRef, error) { +func (m *MockdatabaseNamespace) SeriesReadWriteRef(shardID uint32, id ident.ID, tags ident.TagIterator) (SeriesReadWriteRef, bool, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "SeriesReadWriteRef", shardID, id, tags) ret0, _ := ret[0].(SeriesReadWriteRef) - ret1, _ := ret[1].(error) - return ret0, ret1 + ret1, _ := ret[1].(bool) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 } // SeriesReadWriteRef indicates an expected call of SeriesReadWriteRef diff --git a/src/dbnode/storage/types.go b/src/dbnode/storage/types.go index 0cd69de9ae..4f0c14cf99 100644 --- a/src/dbnode/storage/types.go +++ b/src/dbnode/storage/types.go @@ -388,7 +388,7 @@ type databaseNamespace interface { shardID uint32, id ident.ID, tags ident.TagIterator, - ) (SeriesReadWriteRef, error) + ) (result SeriesReadWriteRef, owned bool, err error) } // SeriesReadWriteRef is a read/write reference for a series, diff --git a/src/query/api/v1/handler/topic/add.go b/src/query/api/v1/handler/topic/add.go index a24d4bef30..9f37115b9a 100644 --- a/src/query/api/v1/handler/topic/add.go +++ b/src/query/api/v1/handler/topic/add.go @@ -27,6 +27,7 @@ import ( "github.com/m3db/m3/src/cmd/services/m3query/config" "github.com/m3db/m3/src/msg/topic" "github.com/m3db/m3/src/query/api/v1/handler" + "github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions" "github.com/m3db/m3/src/query/generated/proto/admin" "github.com/m3db/m3/src/query/util/logging" "github.com/m3db/m3/src/x/instrument" @@ -73,7 +74,9 @@ func (h *AddHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - service, err := h.serviceFn(h.client) + serviceCfg := handleroptions.ServiceNameAndDefaults{} + svcOpts := handleroptions.NewServiceOptions(serviceCfg, r.Header, nil) + service, err := h.serviceFn(h.client, svcOpts) if err != nil { logger.Error("unable to get service", zap.Error(err)) xhttp.Error(w, err, http.StatusInternalServerError) diff --git a/src/query/api/v1/handler/topic/common.go b/src/query/api/v1/handler/topic/common.go index 6154085ff8..1ae2a1bcc6 100644 --- a/src/query/api/v1/handler/topic/common.go +++ b/src/query/api/v1/handler/topic/common.go @@ -25,8 +25,10 @@ import ( "strings" clusterclient "github.com/m3db/m3/src/cluster/client" + "github.com/m3db/m3/src/cluster/kv" "github.com/m3db/m3/src/cmd/services/m3query/config" "github.com/m3db/m3/src/msg/topic" + "github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions" "github.com/m3db/m3/src/query/util/logging" "github.com/m3db/m3/src/x/instrument" xhttp "github.com/m3db/m3/src/x/net/http" @@ -43,7 +45,7 @@ const ( HeaderTopicName = "topic-name" ) -type serviceFn func(clusterClient clusterclient.Client) (topic.Service, error) +type serviceFn func(clusterClient clusterclient.Client, opts handleroptions.ServiceOptions) (topic.Service, error) // Handler represents a generic handler for topic endpoints. // nolint: structcheck @@ -57,11 +59,14 @@ type Handler struct { } // Service gets a topic service from m3cluster client -func Service(clusterClient clusterclient.Client) (topic.Service, error) { - return topic.NewService( - topic.NewServiceOptions(). - SetConfigService(clusterClient), - ) +func Service(clusterClient clusterclient.Client, opts handleroptions.ServiceOptions) (topic.Service, error) { + kvOverride := kv.NewOverrideOptions(). + SetEnvironment(opts.ServiceEnvironment). + SetZone(opts.ServiceZone) + topicOpts := topic.NewServiceOptions(). + SetConfigService(clusterClient). + SetKVOverrideOptions(kvOverride) + return topic.NewService(topicOpts) } // RegisterRoutes registers the topic routes diff --git a/src/query/api/v1/handler/topic/common_test.go b/src/query/api/v1/handler/topic/common_test.go index 62b4580276..77ba308bb2 100644 --- a/src/query/api/v1/handler/topic/common_test.go +++ b/src/query/api/v1/handler/topic/common_test.go @@ -26,6 +26,7 @@ import ( clusterclient "github.com/m3db/m3/src/cluster/client" "github.com/m3db/m3/src/msg/generated/proto/topicpb" "github.com/m3db/m3/src/msg/topic" + "github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions" "github.com/gogo/protobuf/jsonpb" "github.com/golang/mock/gomock" @@ -46,7 +47,7 @@ func validateEqualTopicProto(t *testing.T, this, other topicpb.Topic) { } func testServiceFn(s topic.Service) serviceFn { - return func(clusterClient clusterclient.Client) (topic.Service, error) { + return func(clusterClient clusterclient.Client, opts handleroptions.ServiceOptions) (topic.Service, error) { return s, nil } } diff --git a/src/query/api/v1/handler/topic/delete.go b/src/query/api/v1/handler/topic/delete.go index c7056f7abb..6af4cc1c10 100644 --- a/src/query/api/v1/handler/topic/delete.go +++ b/src/query/api/v1/handler/topic/delete.go @@ -26,6 +26,7 @@ import ( clusterclient "github.com/m3db/m3/src/cluster/client" "github.com/m3db/m3/src/cmd/services/m3query/config" "github.com/m3db/m3/src/query/api/v1/handler" + "github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions" "github.com/m3db/m3/src/query/util/logging" "github.com/m3db/m3/src/x/instrument" xhttp "github.com/m3db/m3/src/x/net/http" @@ -65,7 +66,9 @@ func (h *DeleteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { logger = logging.WithContext(ctx, h.instrumentOpts) ) - service, err := h.serviceFn(h.client) + serviceCfg := handleroptions.ServiceNameAndDefaults{} + svcOpts := handleroptions.NewServiceOptions(serviceCfg, r.Header, nil) + service, err := h.serviceFn(h.client, svcOpts) if err != nil { logger.Error("unable to get service", zap.Error(err)) xhttp.Error(w, err, http.StatusInternalServerError) diff --git a/src/query/api/v1/handler/topic/get.go b/src/query/api/v1/handler/topic/get.go index 095a9a11c0..b6b8d06972 100644 --- a/src/query/api/v1/handler/topic/get.go +++ b/src/query/api/v1/handler/topic/get.go @@ -27,6 +27,7 @@ import ( "github.com/m3db/m3/src/cmd/services/m3query/config" "github.com/m3db/m3/src/msg/topic" "github.com/m3db/m3/src/query/api/v1/handler" + "github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions" "github.com/m3db/m3/src/query/generated/proto/admin" "github.com/m3db/m3/src/query/util/logging" "github.com/m3db/m3/src/x/instrument" @@ -66,7 +67,9 @@ func (h *GetHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { logger = logging.WithContext(ctx, h.instrumentOpts) ) - service, err := h.serviceFn(h.client) + serviceCfg := handleroptions.ServiceNameAndDefaults{} + svcOpts := handleroptions.NewServiceOptions(serviceCfg, r.Header, nil) + service, err := h.serviceFn(h.client, svcOpts) if err != nil { logger.Error("unable to get service", zap.Error(err)) xhttp.Error(w, err, http.StatusInternalServerError) diff --git a/src/query/api/v1/handler/topic/init.go b/src/query/api/v1/handler/topic/init.go index c041c28c10..41e186fb25 100644 --- a/src/query/api/v1/handler/topic/init.go +++ b/src/query/api/v1/handler/topic/init.go @@ -27,6 +27,7 @@ import ( "github.com/m3db/m3/src/cmd/services/m3query/config" "github.com/m3db/m3/src/msg/topic" "github.com/m3db/m3/src/query/api/v1/handler" + "github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions" "github.com/m3db/m3/src/query/generated/proto/admin" "github.com/m3db/m3/src/query/util/logging" "github.com/m3db/m3/src/x/instrument" @@ -73,7 +74,9 @@ func (h *InitHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - service, err := h.serviceFn(h.client) + serviceCfg := handleroptions.ServiceNameAndDefaults{} + svcOpts := handleroptions.NewServiceOptions(serviceCfg, r.Header, nil) + service, err := h.serviceFn(h.client, svcOpts) if err != nil { logger.Error("unable to get service", zap.Error(err)) xhttp.Error(w, err, http.StatusInternalServerError) diff --git a/src/query/server/query.go b/src/query/server/query.go index 08215dfb2c..d55f91a25f 100644 --- a/src/query/server/query.go +++ b/src/query/server/query.go @@ -70,7 +70,6 @@ import ( opentracing "github.com/opentracing/opentracing-go" "github.com/pkg/errors" - "github.com/uber-go/tally" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/reflection" @@ -147,7 +146,14 @@ func Run(runOpts RunOptions) { xconfig.WarnOnDeprecation(cfg, logger) - scope, closer, err := cfg.Metrics.NewRootScope() + scope, closer, _, err := cfg.Metrics.NewRootScopeAndReporters( + instrument.NewRootScopeAndReportersOptions{ + OnError: func(err error) { + // NB(r): Required otherwise collisions when registering metrics will + // cause a panic. + logger.Error("register metric error", zap.Error(err)) + }, + }) if err != nil { logger.Fatal("could not connect to metrics", zap.Error(err)) } @@ -517,6 +523,10 @@ func newDownsampler( tagOptions models.TagOptions, instrumentOpts instrument.Options, ) (downsample.Downsampler, error) { + // Namespace the downsampler metrics. + instrumentOpts = instrumentOpts.SetMetricsScope( + instrumentOpts.MetricsScope().SubScope("downsampler")) + if clusterManagementClient == nil { return nil, fmt.Errorf("no configured cluster management config, " + "must set this config for downsampler") @@ -540,13 +550,12 @@ func newDownsampler( SubScope("tag-decoder-pool"))) downsampler, err := cfg.NewDownsampler(downsample.DownsamplerOptions{ - Storage: storage, - ClusterClient: clusterManagementClient, - RulesKVStore: kvStore, - AutoMappingRules: autoMappingRules, - ClockOptions: clock.NewOptions(), - // TODO: remove after https://github.com/m3db/m3/issues/992 is fixed - InstrumentOptions: instrumentOpts.SetMetricsScope(tally.NoopScope), + Storage: storage, + ClusterClient: clusterManagementClient, + RulesKVStore: kvStore, + AutoMappingRules: autoMappingRules, + ClockOptions: clock.NewOptions(), + InstrumentOptions: instrumentOpts, TagEncoderOptions: tagEncoderOptions, TagDecoderOptions: tagDecoderOptions, TagEncoderPoolOptions: tagEncoderPoolOptions,