diff --git a/scripts/docker-integration-tests/common.sh b/scripts/docker-integration-tests/common.sh index d41b390fbc..dfccdf1d01 100644 --- a/scripts/docker-integration-tests/common.sh +++ b/scripts/docker-integration-tests/common.sh @@ -44,17 +44,9 @@ function retry_with_backoff { } function setup_single_m3db_node { - wait_for_db_init -} - -function setup_two_m3db_nodes { - local dbnode_id_1=${DBNODE_ID_01:-m3db_local_1} - local dbnode_id_2=${DBNODE_ID_02:-m3db_local_2} - local dbnode_host_1=${DBNODE_HOST_01:-dbnode01} - local dbnode_host_2=${DBNODE_HOST_02:-dbnode02} + local dbnode_host=${DBNODE_HOST:-dbnode01} local dbnode_port=${DBNODE_PORT:-9000} - local dbnode_host_1_health_port=${DBNODE_HEALTH_PORT_01:-9012} - local dbnode_host_2_health_port=${DBNODE_HEALTH_PORT_02:-9022} + local dbnode_health_port=${DBNODE_HEALTH_PORT:-9002} local coordinator_port=${COORDINATOR_PORT:-7201} echo "Wait for API to be available" @@ -66,23 +58,15 @@ function setup_two_m3db_nodes { "type": "cluster", "namespaceName": "agg", "retentionTime": "6h", - "num_shards": 2, - "replicationFactor": 2, + "num_shards": 4, + "replicationFactor": 1, "hosts": [ { - "id": "'"${dbnode_id_1}"'", + "id": "m3db_local", "isolation_group": "rack-a", "zone": "embedded", "weight": 1024, - "address": "'"${dbnode_host_1}"'", - "port": '"${dbnode_port}"' - }, - { - "id": "'"${dbnode_id_2}"'", - "isolation_group": "rack-b", - "zone": "embedded", - "weight": 1024, - "address": "'"${dbnode_host_2}"'", + "address": "'"${dbnode_host}"'", "port": '"${dbnode_port}"' } ] @@ -90,21 +74,23 @@ function setup_two_m3db_nodes { echo "Wait until placement is init'd" ATTEMPTS=10 MAX_TIMEOUT=4 TIMEOUT=1 retry_with_backoff \ - '[ "$(curl -sSf 0.0.0.0:'"${coordinator_port}"'/api/v1/placement | jq .placement.instances.'"${dbnode_id_1}"'.id)" == \"'"${dbnode_id_1}"'\" ]' + '[ "$(curl -sSf 0.0.0.0:'"${coordinator_port}"'/api/v1/placement | jq .placement.instances.m3db_local.id)" == \"m3db_local\" ]' wait_for_namespaces echo "Wait until bootstrapped" ATTEMPTS=100 MAX_TIMEOUT=4 TIMEOUT=1 retry_with_backoff \ - '[ "$(curl -sSf 0.0.0.0:'"${dbnode_host_1_health_port}"'/health | jq .bootstrapped)" == true ]' - ATTEMPTS=100 MAX_TIMEOUT=4 TIMEOUT=1 retry_with_backoff \ - '[ "$(curl -sSf 0.0.0.0:'"${dbnode_host_2_health_port}"'/health | jq .bootstrapped)" == true ]' + '[ "$(curl -sSf 0.0.0.0:'"${dbnode_health_port}"'/health | jq .bootstrapped)" == true ]' } -function wait_for_db_init { - local dbnode_host=${DBNODE_HOST:-dbnode01} +function setup_two_m3db_nodes { + local dbnode_id_1=${DBNODE_ID_01:-m3db_local_1} + local dbnode_id_2=${DBNODE_ID_02:-m3db_local_2} + local dbnode_host_1=${DBNODE_HOST_01:-dbnode01} + local dbnode_host_2=${DBNODE_HOST_02:-dbnode02} local dbnode_port=${DBNODE_PORT:-9000} - local dbnode_health_port=${DBNODE_HEALTH_PORT:-9002} + local dbnode_host_1_health_port=${DBNODE_HEALTH_PORT_01:-9012} + local dbnode_host_2_health_port=${DBNODE_HEALTH_PORT_02:-9022} local coordinator_port=${COORDINATOR_PORT:-7201} echo "Wait for API to be available" @@ -116,15 +102,23 @@ function wait_for_db_init { "type": "cluster", "namespaceName": "agg", "retentionTime": "6h", - "num_shards": 4, - "replicationFactor": 1, + "num_shards": 2, + "replicationFactor": 2, "hosts": [ { - "id": "m3db_local", + "id": "'"${dbnode_id_1}"'", "isolation_group": "rack-a", "zone": "embedded", "weight": 1024, - "address": "'"${dbnode_host}"'", + "address": "'"${dbnode_host_1}"'", + "port": '"${dbnode_port}"' + }, + { + "id": "'"${dbnode_id_2}"'", + "isolation_group": "rack-b", + "zone": "embedded", + "weight": 1024, + "address": "'"${dbnode_host_2}"'", "port": '"${dbnode_port}"' } ] @@ -132,13 +126,15 @@ function wait_for_db_init { echo "Wait until placement is init'd" ATTEMPTS=10 MAX_TIMEOUT=4 TIMEOUT=1 retry_with_backoff \ - '[ "$(curl -sSf 0.0.0.0:'"${coordinator_port}"'/api/v1/placement | jq .placement.instances.m3db_local.id)" == \"m3db_local\" ]' + '[ "$(curl -sSf 0.0.0.0:'"${coordinator_port}"'/api/v1/placement | jq .placement.instances.'"${dbnode_id_1}"'.id)" == \"'"${dbnode_id_1}"'\" ]' wait_for_namespaces echo "Wait until bootstrapped" ATTEMPTS=100 MAX_TIMEOUT=4 TIMEOUT=1 retry_with_backoff \ - '[ "$(curl -sSf 0.0.0.0:'"${dbnode_health_port}"'/health | jq .bootstrapped)" == true ]' + '[ "$(curl -sSf 0.0.0.0:'"${dbnode_host_1_health_port}"'/health | jq .bootstrapped)" == true ]' + ATTEMPTS=100 MAX_TIMEOUT=4 TIMEOUT=1 retry_with_backoff \ + '[ "$(curl -sSf 0.0.0.0:'"${dbnode_host_2_health_port}"'/health | jq .bootstrapped)" == true ]' } function wait_for_namespaces { diff --git a/scripts/docker-integration-tests/prometheus_replication/docker-compose.yml b/scripts/docker-integration-tests/prometheus_replication/docker-compose.yml new file mode 100644 index 0000000000..333ee79238 --- /dev/null +++ b/scripts/docker-integration-tests/prometheus_replication/docker-compose.yml @@ -0,0 +1,48 @@ +version: "3.5" +services: + dbnode01: + expose: + - "9000-9004" + - "2379-2380" + ports: + - "0.0.0.0:9000-9004:9000-9004" + - "0.0.0.0:2379-2380:2379-2380" + networks: + - backend + image: "m3dbnode_integration:${REVISION}" + coordinator01: + expose: + - "7201" + - "7203" + ports: + - "0.0.0.0:7201:7201" + - "0.0.0.0:7203:7203" + networks: + - backend + image: "m3coordinator_integration:${REVISION}" + volumes: + - "./m3coordinator01.yml:/etc/m3coordinator/m3coordinator.yml" + dbnode02: + expose: + - "19000-19004" + - "12379-12380" + ports: + - "0.0.0.0:19000-19004:9000-9004" + - "0.0.0.0:12379-12380:2379-2380" + networks: + - backend + image: "m3dbnode_integration:${REVISION}" + coordinator02: + expose: + - "17201" + - "17203" + ports: + - "0.0.0.0:17201:7201" + - "0.0.0.0:17203:7203" + networks: + - backend + image: "m3coordinator_integration:${REVISION}" + volumes: + - "./m3coordinator02.yml:/etc/m3coordinator/m3coordinator.yml" +networks: + backend: diff --git a/scripts/docker-integration-tests/prometheus_replication/m3coordinator01.yml b/scripts/docker-integration-tests/prometheus_replication/m3coordinator01.yml new file mode 100644 index 0000000000..7cb55470b6 --- /dev/null +++ b/scripts/docker-integration-tests/prometheus_replication/m3coordinator01.yml @@ -0,0 +1,47 @@ +listenAddress: + type: "config" + value: "0.0.0.0:7201" + +logging: + level: info + +metrics: + scope: + prefix: "coordinator" + prometheus: + handlerPath: /metrics + listenAddress: 0.0.0.0:7203 # until https://github.com/m3db/m3/issues/682 is resolved + sanitization: prometheus + samplingRate: 1.0 + extended: none + +writeForwarding: + promRemoteWrite: + targets: + - url: http://coordinator02:7201/api/v1/prom/remote/write + +clusters: + - namespaces: + - namespace: agg + type: aggregated + retention: 10h + resolution: 15s + - namespace: unagg + type: unaggregated + retention: 10m + client: + config: + service: + env: default_env + zone: embedded + service: m3db + cacheDir: /var/lib/m3kv + etcdClusters: + - zone: embedded + endpoints: + - dbnode01:2379 + writeConsistencyLevel: majority + readConsistencyLevel: unstrict_majority + +tagOptions: + idScheme: quoted diff --git a/scripts/docker-integration-tests/prometheus_replication/m3coordinator02.yml b/scripts/docker-integration-tests/prometheus_replication/m3coordinator02.yml new file mode 100644 index 0000000000..bb002d124c --- /dev/null +++ b/scripts/docker-integration-tests/prometheus_replication/m3coordinator02.yml @@ -0,0 +1,42 @@ +listenAddress: + type: "config" + value: "0.0.0.0:7201" + +logging: + level: info + +metrics: + scope: + prefix: "coordinator" + prometheus: + handlerPath: /metrics + listenAddress: 0.0.0.0:7203 # until https://github.com/m3db/m3/issues/682 is resolved + sanitization: prometheus + samplingRate: 1.0 + extended: none + +clusters: + - namespaces: + - namespace: agg + type: aggregated + retention: 10h + resolution: 15s + - namespace: unagg + type: unaggregated + retention: 10m + client: + config: + service: + env: default_env + zone: embedded + service: m3db + cacheDir: /var/lib/m3kv + etcdClusters: + - zone: embedded + endpoints: + - dbnode02:2379 + writeConsistencyLevel: majority + readConsistencyLevel: unstrict_majority + +tagOptions: + idScheme: quoted diff --git a/scripts/docker-integration-tests/prometheus_replication/test.sh b/scripts/docker-integration-tests/prometheus_replication/test.sh new file mode 100755 index 0000000000..cb857dfeb6 --- /dev/null +++ b/scripts/docker-integration-tests/prometheus_replication/test.sh @@ -0,0 +1,96 @@ +#!/usr/bin/env bash + +set -xe + +source $GOPATH/src/github.com/m3db/m3/scripts/docker-integration-tests/common.sh +REVISION=$(git rev-parse HEAD) +COMPOSE_FILE=$GOPATH/src/github.com/m3db/m3/scripts/docker-integration-tests/prometheus_replication/docker-compose.yml +# quay.io/m3db/prometheus_remote_client_golang @ v0.4.3 +PROMREMOTECLI_IMAGE=quay.io/m3db/prometheus_remote_client_golang@sha256:fc56df819bff9a5a087484804acf3a584dd4a78c68900c31a28896ed66ca7e7b +JQ_IMAGE=realguess/jq:1.4@sha256:300c5d9fb1d74154248d155ce182e207cf6630acccbaadd0168e18b15bfaa786 +export REVISION + +echo "Pull containers required for test" +docker pull $PROMREMOTECLI_IMAGE +docker pull $JQ_IMAGE + +echo "Run m3dbnode and m3coordinator containers" +docker-compose -f ${COMPOSE_FILE} up -d dbnode01 +docker-compose -f ${COMPOSE_FILE} up -d dbnode02 +docker-compose -f ${COMPOSE_FILE} up -d coordinator01 +docker-compose -f ${COMPOSE_FILE} up -d coordinator02 + +function defer { + docker-compose -f ${COMPOSE_FILE} down || echo "unable to shutdown containers" # CI fails to stop all containers sometimes +} +trap defer EXIT + +echo "Setup dbnode in first cluster" +DBNODE_HOST=dbnode01 \ +DBNODE_PORT=9000 \ +DBNODE_HEALTH_PORT=9002 \ +COORDINATOR_PORT=7201 \ + setup_single_m3db_node + +echo "Setup dbnode in second cluster" +DBNODE_HOST=dbnode02 \ +DBNODE_PORT=9000 \ +DBNODE_HEALTH_PORT=19002 \ +COORDINATOR_PORT=17201 \ + setup_single_m3db_node + +function prometheus_remote_write { + local metric_name=$1 + local datapoint_timestamp=$2 + local datapoint_value=$3 + local expect_success=$4 + local expect_success_err=$5 + local expect_status=$6 + local expect_status_err=$7 + + network=$(docker network ls --format '{{.ID}}' | tail -n 1) + out=$((docker run -it --rm --network $network \ + $PROMREMOTECLI_IMAGE \ + -u http://coordinator01:7201/api/v1/prom/remote/write \ + -t __name__:${metric_name} \ + -d ${datapoint_timestamp},${datapoint_value} | grep -v promremotecli_log) || true) + success=$(echo $out | grep -v promremotecli_log | docker run --rm -i $JQ_IMAGE jq .success) + status=$(echo $out | grep -v promremotecli_log | docker run --rm -i $JQ_IMAGE jq .statusCode) + if [[ "$success" != "$expect_success" ]]; then + echo $expect_success_err + return 1 + fi + if [[ "$status" != "$expect_status" ]]; then + echo "${expect_status_err}: actual=${status}" + return 1 + fi + echo "Returned success=${success}, status=${status} as expected" + return 0 +} + +function test_replication_forwarding { + now=$(date +"%s") + + # Make sure both are up (otherwise forwarding could fail). + echo "Test both clusters responding to queries" + ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ + '[[ $(curl -s 0.0.0.0:7201/api/v1/query?query=any | jq -r ".data.result | length") -eq 0 ]]' + ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ + '[[ $(curl -s 0.0.0.0:17201/api/v1/query?query=any | jq -r ".data.result | length") -eq 0 ]]' + + # Test writing. + echo "Test write data to first cluster" + prometheus_remote_write \ + "foo_replicate" now 42.42 \ + true "Expected request to succeed" \ + 200 "Expected request to return status code 200" + + # Test queries can eventually read back replicated data from second + # cluster using port 17201 from the second cluster's coordinator + echo "Test read replicated data" + ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ + '[[ $(curl -s 0.0.0.0:17201/api/v1/query?query=foo_replicate | jq -r ".data.result | length") -gt 0 ]]' +} + +# Run all tests +test_replication_forwarding diff --git a/scripts/docker-integration-tests/run.sh b/scripts/docker-integration-tests/run.sh index 5194a75166..2df8d8f0a0 100755 --- a/scripts/docker-integration-tests/run.sh +++ b/scripts/docker-integration-tests/run.sh @@ -6,6 +6,7 @@ TESTS=( scripts/docker-integration-tests/simple/test.sh scripts/docker-integration-tests/cold_writes_simple/test.sh scripts/docker-integration-tests/prometheus/test.sh + scripts/docker-integration-tests/prometheus_replication/test.sh scripts/docker-integration-tests/carbon/test.sh scripts/docker-integration-tests/aggregator/test.sh scripts/docker-integration-tests/query_fanout/test.sh diff --git a/src/query/api/v1/handler/prometheus/remote/write.go b/src/query/api/v1/handler/prometheus/remote/write.go index 650cc35cca..5d89afa53a 100644 --- a/src/query/api/v1/handler/prometheus/remote/write.go +++ b/src/query/api/v1/handler/prometheus/remote/write.go @@ -25,6 +25,7 @@ import ( "context" "errors" "fmt" + "io/ioutil" "net/http" "strings" "time" @@ -422,9 +423,8 @@ func (h *PromWriteHandler) forward( if method == "" { method = http.MethodPost } - - req, err := http.NewRequest(target.Method, target.URL, - bytes.NewReader(request.CompressedBody)) + url := target.URL + req, err := http.NewRequest(method, url, bytes.NewReader(request.CompressedBody)) if err != nil { return err } @@ -433,8 +433,16 @@ func (h *PromWriteHandler) forward( if err != nil { return err } + + defer resp.Body.Close() + if resp.StatusCode/100 != 2 { - return fmt.Errorf("expected status code 2XX: actual=%v", resp.StatusCode) + response, err := ioutil.ReadAll(resp.Body) + if err != nil { + response = []byte(fmt.Sprintf("error reading body: %v", err)) + } + return fmt.Errorf("expected status code 2XX: actual=%v, method=%v, url=%v, resp=%s", + resp.StatusCode, method, url, response) } return nil }