diff --git a/.buildkite/pipeline.yml b/.buildkite/pipeline.yml index 9c8ea68d22..786e4935a1 100644 --- a/.buildkite/pipeline.yml +++ b/.buildkite/pipeline.yml @@ -44,6 +44,7 @@ steps: <<: *common - name: "Integration (:docker:)" command: make clean install-vendor docker-integration-test + parallelism: 2 env: CGO_ENABLED: 0 GIMME_GO_VERSION: 1.12.x diff --git a/Makefile b/Makefile index e67c0fc526..74046657a2 100644 --- a/Makefile +++ b/Makefile @@ -231,13 +231,7 @@ docs-test: .PHONY: docker-integration-test docker-integration-test: @echo "--- Running Docker integration test" - @./scripts/docker-integration-tests/setup.sh - @./scripts/docker-integration-tests/simple/test.sh - @./scripts/docker-integration-tests/cold_writes_simple/test.sh - @./scripts/docker-integration-tests/prometheus/test.sh - @./scripts/docker-integration-tests/carbon/test.sh - @./scripts/docker-integration-tests/aggregator/test.sh - @./scripts/docker-integration-tests/query_fanout/test.sh + ./scripts/docker-integration-tests/run.sh .PHONY: site-build site-build: diff --git a/scripts/docker-integration-tests/common.sh b/scripts/docker-integration-tests/common.sh index 6e14aedf29..d41b390fbc 100644 --- a/scripts/docker-integration-tests/common.sh +++ b/scripts/docker-integration-tests/common.sh @@ -47,14 +47,14 @@ function setup_single_m3db_node { wait_for_db_init } -function setup_three_m3db_nodes { - local dbnode_host_1=${DBNODE_HOST:-dbnode01} - local dbnode_host_2=${DBNODE_HOST:-dbnode02} - local dbnode_host_3=${DBNODE_HOST:-dbnode03} +function setup_two_m3db_nodes { + local dbnode_id_1=${DBNODE_ID_01:-m3db_local_1} + local dbnode_id_2=${DBNODE_ID_02:-m3db_local_2} + local dbnode_host_1=${DBNODE_HOST_01:-dbnode01} + local dbnode_host_2=${DBNODE_HOST_02:-dbnode02} local dbnode_port=${DBNODE_PORT:-9000} - local dbnode_host_1_health_port=${DBNODE_HEALTH_PORT:-9012} - local dbnode_host_2_health_port=${DBNODE_HEALTH_PORT:-9022} - local dbnode_host_3_health_port=${DBNODE_HEALTH_PORT:-9032} + local dbnode_host_1_health_port=${DBNODE_HEALTH_PORT_01:-9012} + local dbnode_host_2_health_port=${DBNODE_HEALTH_PORT_02:-9022} local coordinator_port=${COORDINATOR_PORT:-7201} echo "Wait for API to be available" @@ -66,11 +66,11 @@ function setup_three_m3db_nodes { "type": "cluster", "namespaceName": "agg", "retentionTime": "6h", - "num_shards": 3, - "replicationFactor": 3, + "num_shards": 2, + "replicationFactor": 2, "hosts": [ { - "id": "m3db_local_1", + "id": "'"${dbnode_id_1}"'", "isolation_group": "rack-a", "zone": "embedded", "weight": 1024, @@ -78,27 +78,19 @@ function setup_three_m3db_nodes { "port": '"${dbnode_port}"' }, { - "id": "m3db_local_2", + "id": "'"${dbnode_id_2}"'", "isolation_group": "rack-b", "zone": "embedded", "weight": 1024, "address": "'"${dbnode_host_2}"'", "port": '"${dbnode_port}"' - }, - { - "id": "m3db_local_3", - "isolation_group": "rack-c", - "zone": "embedded", - "weight": 1024, - "address": "'"${dbnode_host_3}"'", - "port": '"${dbnode_port}"' } ] }' echo "Wait until placement is init'd" ATTEMPTS=10 MAX_TIMEOUT=4 TIMEOUT=1 retry_with_backoff \ - '[ "$(curl -sSf 0.0.0.0:'"${coordinator_port}"'/api/v1/placement | jq .placement.instances.m3db_local_1.id)" == \"m3db_local_1\" ]' + '[ "$(curl -sSf 0.0.0.0:'"${coordinator_port}"'/api/v1/placement | jq .placement.instances.'"${dbnode_id_1}"'.id)" == \"'"${dbnode_id_1}"'\" ]' wait_for_namespaces @@ -107,8 +99,6 @@ function setup_three_m3db_nodes { '[ "$(curl -sSf 0.0.0.0:'"${dbnode_host_1_health_port}"'/health | jq .bootstrapped)" == true ]' ATTEMPTS=100 MAX_TIMEOUT=4 TIMEOUT=1 retry_with_backoff \ '[ "$(curl -sSf 0.0.0.0:'"${dbnode_host_2_health_port}"'/health | jq .bootstrapped)" == true ]' - ATTEMPTS=100 MAX_TIMEOUT=4 TIMEOUT=1 retry_with_backoff \ - '[ "$(curl -sSf 0.0.0.0:'"${dbnode_host_3_health_port}"'/health | jq .bootstrapped)" == true ]' } function wait_for_db_init { diff --git a/scripts/docker-integration-tests/repair/docker-compose.yml b/scripts/docker-integration-tests/repair/docker-compose.yml index 2bb9780e20..b91284adcf 100644 --- a/scripts/docker-integration-tests/repair/docker-compose.yml +++ b/scripts/docker-integration-tests/repair/docker-compose.yml @@ -28,20 +28,6 @@ services: - M3DB_HOST_ID=m3db_local_2 volumes: - "./m3dbnode.yml:/etc/m3dbnode/m3dbnode.yml" - dbnode03: - expose: - - "9000-9004" - - "2379-2380" - ports: - - "0.0.0.0:9032:9002" - - "0.0.0.0:9033:9003" - networks: - - backend - image: "m3dbnode_integration:${REVISION}" - environment: - - M3DB_HOST_ID=m3db_local_3 - volumes: - - "./m3dbnode.yml:/etc/m3dbnode/m3dbnode.yml" coordinator01: expose: - "7201" diff --git a/scripts/docker-integration-tests/repair/test.sh b/scripts/docker-integration-tests/repair/test.sh index 89233f3cd3..aa7629bb34 100755 --- a/scripts/docker-integration-tests/repair/test.sh +++ b/scripts/docker-integration-tests/repair/test.sh @@ -11,7 +11,6 @@ export REVISION echo "Run m3dbnode and m3coordinator containers" docker-compose -f ${COMPOSE_FILE} up -d --renew-anon-volumes dbnode01 docker-compose -f ${COMPOSE_FILE} up -d --renew-anon-volumes dbnode02 -docker-compose -f ${COMPOSE_FILE} up -d --renew-anon-volumes dbnode03 docker-compose -f ${COMPOSE_FILE} up -d --renew-anon-volumes coordinator01 # Think of this as a defer func() in golang @@ -20,7 +19,7 @@ function defer { } trap defer EXIT -setup_three_m3db_nodes +setup_two_m3db_nodes function write_data { namespace=$1 @@ -76,11 +75,7 @@ write_data "coldWritesRepairAndNoIndex" "foo" "$(($(date +"%s") - 60 * 60 * 2))" echo "Expect to read the data back from dbnode01" read_all "coldWritesRepairAndNoIndex" "foo" 1 9012 -# These two should eventually succeed once a repair detects the mismatch. +# This should eventually succeed once a repair detects the mismatch. echo "Wait for the data to become available (via repairs) from dbnode02" ATTEMPTS=30 MAX_TIMEOUT=4 TIMEOUT=1 retry_with_backoff \ read_all "coldWritesRepairAndNoIndex" "foo" 1 9022 - -echo "Wait for the data to become available (via repairs) from dbnode03" -ATTEMPTS=10 MAX_TIMEOUT=4 TIMEOUT=1 retry_with_backoff \ - read_all "coldWritesRepairAndNoIndex" "foo" 1 9032 diff --git a/scripts/docker-integration-tests/repair_and_replication/docker-compose.yml b/scripts/docker-integration-tests/repair_and_replication/docker-compose.yml new file mode 100644 index 0000000000..b5b52d922d --- /dev/null +++ b/scripts/docker-integration-tests/repair_and_replication/docker-compose.yml @@ -0,0 +1,88 @@ +version: "3.5" +services: + cluster_a_dbnode01: + expose: + - "9000-9004" + - "2379-2380" + ports: + - "0.0.0.0:9012:9002" + - "0.0.0.0:9013:9003" + networks: + - backend + image: "m3dbnode_integration:${REVISION}" + environment: + - M3DB_HOST_ID=cluster_a_m3db_local_1 + volumes: + - "./m3dbnode-cluster-a.yml:/etc/m3dbnode/m3dbnode.yml" + cluster_a_dbnode02: + expose: + - "9000-9004" + - "2379-2380" + ports: + - "0.0.0.0:9022:9002" + - "0.0.0.0:9023:9003" + networks: + - backend + image: "m3dbnode_integration:${REVISION}" + environment: + - M3DB_HOST_ID=cluster_a_m3db_local_2 + volumes: + - "./m3dbnode-cluster-a.yml:/etc/m3dbnode/m3dbnode.yml" + cluster_a_coordinator01: + expose: + - "7201" + - "7203" + - "7204" + ports: + - "0.0.0.0:7201:7201" + - "0.0.0.0:7203:7203" + - "0.0.0.0:7204:7204" + networks: + - backend + image: "m3coordinator_integration:${REVISION}" + volumes: + - "./m3coordinator-cluster-a.yml/:/etc/m3coordinator/m3coordinator.yml" + cluster_b_dbnode01: + expose: + - "9000-9004" + - "2379-2380" + ports: + - "0.0.0.0:9112:9002" + - "0.0.0.0:9113:9003" + networks: + - backend + image: "m3dbnode_integration:${REVISION}" + environment: + - M3DB_HOST_ID=cluster_b_m3db_local_1 + volumes: + - "./m3dbnode-cluster-b.yml:/etc/m3dbnode/m3dbnode.yml" + cluster_b_dbnode02: + expose: + - "9000-9004" + - "2379-2380" + ports: + - "0.0.0.0:9122:9002" + - "0.0.0.0:9123:9003" + networks: + - backend + image: "m3dbnode_integration:${REVISION}" + environment: + - M3DB_HOST_ID=cluster_b_m3db_local_2 + volumes: + - "./m3dbnode-cluster-b.yml:/etc/m3dbnode/m3dbnode.yml" + cluster_b_coordinator01: + expose: + - "7201" + - "7203" + - "7204" + ports: + - "0.0.0.0:17201:7201" + - "0.0.0.0:17203:7203" + - "0.0.0.0:17204:7204" + networks: + - backend + image: "m3coordinator_integration:${REVISION}" + volumes: + - "./m3coordinator-cluster-b.yml:/etc/m3coordinator/m3coordinator.yml" +networks: + backend: diff --git a/scripts/docker-integration-tests/repair_and_replication/m3coordinator-cluster-a.yml b/scripts/docker-integration-tests/repair_and_replication/m3coordinator-cluster-a.yml new file mode 100644 index 0000000000..4da4151a28 --- /dev/null +++ b/scripts/docker-integration-tests/repair_and_replication/m3coordinator-cluster-a.yml @@ -0,0 +1,46 @@ +listenAddress: + type: "config" + value: "0.0.0.0:7201" + +logging: + level: info + +metrics: + scope: + prefix: "coordinator" + prometheus: + handlerPath: /metrics + listenAddress: 0.0.0.0:7203 # until https://github.com/m3db/m3/issues/682 is resolved + sanitization: prometheus + samplingRate: 1.0 + extended: none + +limits: + perQuery: + maxFetchedSeries: 100 + +clusters: + - namespaces: + - namespace: agg + type: aggregated + retention: 10h + resolution: 15s + - namespace: unagg + type: unaggregated + retention: 10h + client: + config: + service: + env: default_env + zone: embedded + service: m3db + cacheDir: /var/lib/m3kv + etcdClusters: + - zone: embedded + endpoints: + - cluster_a_dbnode01:2379 + writeConsistencyLevel: majority + readConsistencyLevel: unstrict_majority + +tagOptions: + idScheme: quoted diff --git a/scripts/docker-integration-tests/repair_and_replication/m3coordinator-cluster-b.yml b/scripts/docker-integration-tests/repair_and_replication/m3coordinator-cluster-b.yml new file mode 100644 index 0000000000..9a74154728 --- /dev/null +++ b/scripts/docker-integration-tests/repair_and_replication/m3coordinator-cluster-b.yml @@ -0,0 +1,46 @@ +listenAddress: + type: "config" + value: "0.0.0.0:7201" + +logging: + level: info + +metrics: + scope: + prefix: "coordinator" + prometheus: + handlerPath: /metrics + listenAddress: 0.0.0.0:7203 # until https://github.com/m3db/m3/issues/682 is resolved + sanitization: prometheus + samplingRate: 1.0 + extended: none + +limits: + perQuery: + maxFetchedSeries: 100 + +clusters: + - namespaces: + - namespace: agg + type: aggregated + retention: 10h + resolution: 15s + - namespace: unagg + type: unaggregated + retention: 10h + client: + config: + service: + env: default_env + zone: embedded + service: m3db + cacheDir: /var/lib/m3kv + etcdClusters: + - zone: embedded + endpoints: + - cluster_b_dbnode01:2379 + writeConsistencyLevel: majority + readConsistencyLevel: unstrict_majority + +tagOptions: + idScheme: quoted diff --git a/scripts/docker-integration-tests/repair_and_replication/m3dbnode-cluster-a.yml b/scripts/docker-integration-tests/repair_and_replication/m3dbnode-cluster-a.yml new file mode 100644 index 0000000000..002f7dd1cf --- /dev/null +++ b/scripts/docker-integration-tests/repair_and_replication/m3dbnode-cluster-a.yml @@ -0,0 +1,103 @@ +db: + logging: + level: info + + tracing: + backend: jaeger + jaeger: + reporter: + localAgentHostPort: jaeger:6831 + sampler: + type: const + param: 1 + + metrics: + prometheus: + handlerPath: /metrics + sanitization: prometheus + samplingRate: 1.0 + extended: detailed + + listenAddress: 0.0.0.0:9000 + clusterListenAddress: 0.0.0.0:9001 + httpNodeListenAddress: 0.0.0.0:9002 + httpClusterListenAddress: 0.0.0.0:9003 + debugListenAddress: 0.0.0.0:9004 + + hostID: + resolver: environment + envVarName: M3DB_HOST_ID + + client: + writeConsistencyLevel: majority + readConsistencyLevel: unstrict_majority + + gcPercentage: 100 + + writeNewSeriesAsync: true + writeNewSeriesLimitPerSecond: 1048576 + writeNewSeriesBackoffDuration: 2ms + + bootstrap: + # Intentionally disable peers bootstrapper to ensure it doesn't interfere with test. + bootstrappers: + - filesystem + - commitlog + - uninitialized_topology + commitlog: + returnUnfulfilledForCorruptCommitLogFiles: false + + cache: + series: + policy: lru + postingsList: + size: 262144 + + commitlog: + flushMaxBytes: 524288 + flushEvery: 1s + queue: + calculationType: fixed + size: 2097152 + + fs: + filePathPrefix: /var/lib/m3db + + config: + service: + env: default_env + zone: embedded + service: m3db + cacheDir: /var/lib/m3kv + etcdClusters: + - zone: embedded + endpoints: + - cluster_a_dbnode01:2379 + seedNodes: + initialCluster: + - hostID: cluster_a_m3db_local_1 + endpoint: http://cluster_a_dbnode01:2380 + + # Enable repairs (within cluster a). + repair: + enabled: true + throttle: 1ms + checkInterval: 1ms + + # Enable replication (from cluster b). + replication: + clusters: + - name: "cluster-b" + repairEnabled: true + client: + config: + service: + env: default_env + zone: embedded + service: m3db + cacheDir: /var/lib/m3kv + etcdClusters: + - zone: embedded + endpoints: + - cluster_b_dbnode01:2379 + diff --git a/scripts/docker-integration-tests/repair_and_replication/m3dbnode-cluster-b.yml b/scripts/docker-integration-tests/repair_and_replication/m3dbnode-cluster-b.yml new file mode 100644 index 0000000000..59dcc55b6b --- /dev/null +++ b/scripts/docker-integration-tests/repair_and_replication/m3dbnode-cluster-b.yml @@ -0,0 +1,103 @@ +db: + logging: + level: info + + tracing: + backend: jaeger + jaeger: + reporter: + localAgentHostPort: jaeger:6831 + sampler: + type: const + param: 1 + + metrics: + prometheus: + handlerPath: /metrics + sanitization: prometheus + samplingRate: 1.0 + extended: detailed + + listenAddress: 0.0.0.0:9000 + clusterListenAddress: 0.0.0.0:9001 + httpNodeListenAddress: 0.0.0.0:9002 + httpClusterListenAddress: 0.0.0.0:9003 + debugListenAddress: 0.0.0.0:9004 + + hostID: + resolver: environment + envVarName: M3DB_HOST_ID + + client: + writeConsistencyLevel: majority + readConsistencyLevel: unstrict_majority + + gcPercentage: 100 + + writeNewSeriesAsync: true + writeNewSeriesLimitPerSecond: 1048576 + writeNewSeriesBackoffDuration: 2ms + + bootstrap: + # Intentionally disable peers bootstrapper to ensure it doesn't interfere with test. + bootstrappers: + - filesystem + - commitlog + - uninitialized_topology + commitlog: + returnUnfulfilledForCorruptCommitLogFiles: false + + cache: + series: + policy: lru + postingsList: + size: 262144 + + commitlog: + flushMaxBytes: 524288 + flushEvery: 1s + queue: + calculationType: fixed + size: 2097152 + + fs: + filePathPrefix: /var/lib/m3db + + config: + service: + env: default_env + zone: embedded + service: m3db + cacheDir: /var/lib/m3kv + etcdClusters: + - zone: embedded + endpoints: + - cluster_b_dbnode01:2379 + seedNodes: + initialCluster: + - hostID: cluster_b_m3db_local_1 + endpoint: http://cluster_b_dbnode01:2380 + + # Enable repairs (within cluster b). + repair: + enabled: true + throttle: 1ms + checkInterval: 1ms + + # Enable replication (from cluster a). + replication: + clusters: + - name: "cluster-a" + repairEnabled: true + client: + config: + service: + env: default_env + zone: embedded + service: m3db + cacheDir: /var/lib/m3kv + etcdClusters: + - zone: embedded + endpoints: + - cluster_a_dbnode01:2379 + diff --git a/scripts/docker-integration-tests/repair_and_replication/test.sh b/scripts/docker-integration-tests/repair_and_replication/test.sh new file mode 100755 index 0000000000..cd4ff92f08 --- /dev/null +++ b/scripts/docker-integration-tests/repair_and_replication/test.sh @@ -0,0 +1,115 @@ +#!/usr/bin/env bash + +set -xe + +source $GOPATH/src/github.com/m3db/m3/scripts/docker-integration-tests/common.sh +REVISION=$(git rev-parse HEAD) +SCRIPT_PATH=$GOPATH/src/github.com/m3db/m3/scripts/docker-integration-tests/repair_and_replication +COMPOSE_FILE=$SCRIPT_PATH/docker-compose.yml +export REVISION + +echo "Run m3dbnode and m3coordinator containers" +docker-compose -f ${COMPOSE_FILE} up -d --renew-anon-volumes cluster_a_dbnode01 +docker-compose -f ${COMPOSE_FILE} up -d --renew-anon-volumes cluster_a_dbnode02 +docker-compose -f ${COMPOSE_FILE} up -d --renew-anon-volumes cluster_a_coordinator01 + +docker-compose -f ${COMPOSE_FILE} up -d --renew-anon-volumes cluster_b_dbnode01 +docker-compose -f ${COMPOSE_FILE} up -d --renew-anon-volumes cluster_b_dbnode02 +docker-compose -f ${COMPOSE_FILE} up -d --renew-anon-volumes cluster_b_coordinator01 + +# Think of this as a defer func() in golang +function defer { + docker-compose -f ${COMPOSE_FILE} down || echo "unable to shutdown containers" # CI fails to stop all containers sometimes +} +trap defer EXIT + +# Setup cluster A. +DBNODE_ID_01=cluster_a_m3db_local_1 \ +DBNODE_ID_02=cluster_a_m3db_local_2 \ +DBNODE_HOST_01=cluster_a_dbnode01 \ +DBNODE_HOST_02=cluster_a_dbnode02 \ +DBNODE_HEALTH_PORT_01=9012 \ +DBNODE_HEALTH_PORT_02=9022 \ +COORDINATOR_PORT=7201 \ + setup_two_m3db_nodes + +# Setup cluster B. +DBNODE_ID_01=cluster_b_m3db_local_1 \ +DBNODE_ID_02=cluster_b_m3db_local_2 \ +DBNODE_HOST_01=cluster_b_dbnode01 \ +DBNODE_HOST_02=cluster_b_dbnode02 \ +DBNODE_HEALTH_PORT_01=9112 \ +DBNODE_HEALTH_PORT_02=9122 \ +COORDINATOR_PORT=17201 \ + setup_two_m3db_nodes + +function write_data { + namespace=$1 + id=$2 + timestamp=$3 + value=$4 + port=$5 + + respCode=$(curl -s -o /dev/null -X POST -w "%{http_code}" 0.0.0.0:"$port"/write -d '{ + "namespace": "'"$namespace"'", + "id": "'"$id"'", + "datapoint": { + "timestamp":'"$timestamp"', + "value": '"$value"' + } + }') + + + if [[ $respCode -eq "200" ]]; then + return 0 + else + return 1 + fi +} + +function read_all { + namespace=$1 + id=$2 + expected_datapoints=$3 + port=$4 + + received_datapoints=$(curl -sSf -X POST 0.0.0.0:"$port"/fetch -d '{ + "namespace": "'"$namespace"'", + "id": "'"$id"'", + "rangeStart": 0, + "rangeEnd":'"$(date +"%s")"' + }' | jq '.datapoints | length') + + if [[ $expected_datapoints -eq $received_datapoints ]]; then + return 0 + else + return 1 + fi +} + +# Write 2 block sizes into the past to ensure it's a repairable block since the current mutable +# block will not be repaired. Use the node-specific port to ensure the write only goes to dbnode01 +# and not the other two nodes. +echo "Write data for 'now - 2 * blockSize' to cluster_a_dbnode01" +write_data "coldWritesRepairAndNoIndex" "foo" "$(($(date +"%s") - 60 * 60 * 2))" 12.3456789 9012 + +# This should pass immediately since it was written to this node. +echo "Expect to read the data back from dbnode01" +read_all "coldWritesRepairAndNoIndex" "foo" 1 9012 + +# This should eventually succeed once a repair detects the mismatch +# and repairs the data within the cluster. +echo "Wait for the data to become available (via repairs) from cluster_a_dbnode02" +ATTEMPTS=30 MAX_TIMEOUT=4 TIMEOUT=1 retry_with_backoff \ + read_all "coldWritesRepairAndNoIndex" "foo" 1 9022 + +# These two should eventually succeed once the replication feature detects +# the mismatch and repairs the data across the clusters. +echo "Wait for the data to become available (via replication) from cluster_b_dbnode01" +ATTEMPTS=30 MAX_TIMEOUT=4 TIMEOUT=1 retry_with_backoff \ + read_all "coldWritesRepairAndNoIndex" "foo" 1 9112 + +echo "Wait for the data to become available (via replication) from cluster_b_dbnode02" +ATTEMPTS=30 MAX_TIMEOUT=4 TIMEOUT=1 retry_with_backoff \ + read_all "coldWritesRepairAndNoIndex" "foo" 1 9122 + diff --git a/scripts/docker-integration-tests/replication/docker-compose.yml b/scripts/docker-integration-tests/replication/docker-compose.yml new file mode 100644 index 0000000000..b5b52d922d --- /dev/null +++ b/scripts/docker-integration-tests/replication/docker-compose.yml @@ -0,0 +1,88 @@ +version: "3.5" +services: + cluster_a_dbnode01: + expose: + - "9000-9004" + - "2379-2380" + ports: + - "0.0.0.0:9012:9002" + - "0.0.0.0:9013:9003" + networks: + - backend + image: "m3dbnode_integration:${REVISION}" + environment: + - M3DB_HOST_ID=cluster_a_m3db_local_1 + volumes: + - "./m3dbnode-cluster-a.yml:/etc/m3dbnode/m3dbnode.yml" + cluster_a_dbnode02: + expose: + - "9000-9004" + - "2379-2380" + ports: + - "0.0.0.0:9022:9002" + - "0.0.0.0:9023:9003" + networks: + - backend + image: "m3dbnode_integration:${REVISION}" + environment: + - M3DB_HOST_ID=cluster_a_m3db_local_2 + volumes: + - "./m3dbnode-cluster-a.yml:/etc/m3dbnode/m3dbnode.yml" + cluster_a_coordinator01: + expose: + - "7201" + - "7203" + - "7204" + ports: + - "0.0.0.0:7201:7201" + - "0.0.0.0:7203:7203" + - "0.0.0.0:7204:7204" + networks: + - backend + image: "m3coordinator_integration:${REVISION}" + volumes: + - "./m3coordinator-cluster-a.yml/:/etc/m3coordinator/m3coordinator.yml" + cluster_b_dbnode01: + expose: + - "9000-9004" + - "2379-2380" + ports: + - "0.0.0.0:9112:9002" + - "0.0.0.0:9113:9003" + networks: + - backend + image: "m3dbnode_integration:${REVISION}" + environment: + - M3DB_HOST_ID=cluster_b_m3db_local_1 + volumes: + - "./m3dbnode-cluster-b.yml:/etc/m3dbnode/m3dbnode.yml" + cluster_b_dbnode02: + expose: + - "9000-9004" + - "2379-2380" + ports: + - "0.0.0.0:9122:9002" + - "0.0.0.0:9123:9003" + networks: + - backend + image: "m3dbnode_integration:${REVISION}" + environment: + - M3DB_HOST_ID=cluster_b_m3db_local_2 + volumes: + - "./m3dbnode-cluster-b.yml:/etc/m3dbnode/m3dbnode.yml" + cluster_b_coordinator01: + expose: + - "7201" + - "7203" + - "7204" + ports: + - "0.0.0.0:17201:7201" + - "0.0.0.0:17203:7203" + - "0.0.0.0:17204:7204" + networks: + - backend + image: "m3coordinator_integration:${REVISION}" + volumes: + - "./m3coordinator-cluster-b.yml:/etc/m3coordinator/m3coordinator.yml" +networks: + backend: diff --git a/scripts/docker-integration-tests/replication/m3coordinator-cluster-a.yml b/scripts/docker-integration-tests/replication/m3coordinator-cluster-a.yml new file mode 100644 index 0000000000..4da4151a28 --- /dev/null +++ b/scripts/docker-integration-tests/replication/m3coordinator-cluster-a.yml @@ -0,0 +1,46 @@ +listenAddress: + type: "config" + value: "0.0.0.0:7201" + +logging: + level: info + +metrics: + scope: + prefix: "coordinator" + prometheus: + handlerPath: /metrics + listenAddress: 0.0.0.0:7203 # until https://github.com/m3db/m3/issues/682 is resolved + sanitization: prometheus + samplingRate: 1.0 + extended: none + +limits: + perQuery: + maxFetchedSeries: 100 + +clusters: + - namespaces: + - namespace: agg + type: aggregated + retention: 10h + resolution: 15s + - namespace: unagg + type: unaggregated + retention: 10h + client: + config: + service: + env: default_env + zone: embedded + service: m3db + cacheDir: /var/lib/m3kv + etcdClusters: + - zone: embedded + endpoints: + - cluster_a_dbnode01:2379 + writeConsistencyLevel: majority + readConsistencyLevel: unstrict_majority + +tagOptions: + idScheme: quoted diff --git a/scripts/docker-integration-tests/replication/m3coordinator-cluster-b.yml b/scripts/docker-integration-tests/replication/m3coordinator-cluster-b.yml new file mode 100644 index 0000000000..9a74154728 --- /dev/null +++ b/scripts/docker-integration-tests/replication/m3coordinator-cluster-b.yml @@ -0,0 +1,46 @@ +listenAddress: + type: "config" + value: "0.0.0.0:7201" + +logging: + level: info + +metrics: + scope: + prefix: "coordinator" + prometheus: + handlerPath: /metrics + listenAddress: 0.0.0.0:7203 # until https://github.com/m3db/m3/issues/682 is resolved + sanitization: prometheus + samplingRate: 1.0 + extended: none + +limits: + perQuery: + maxFetchedSeries: 100 + +clusters: + - namespaces: + - namespace: agg + type: aggregated + retention: 10h + resolution: 15s + - namespace: unagg + type: unaggregated + retention: 10h + client: + config: + service: + env: default_env + zone: embedded + service: m3db + cacheDir: /var/lib/m3kv + etcdClusters: + - zone: embedded + endpoints: + - cluster_b_dbnode01:2379 + writeConsistencyLevel: majority + readConsistencyLevel: unstrict_majority + +tagOptions: + idScheme: quoted diff --git a/scripts/docker-integration-tests/replication/m3dbnode-cluster-a.yml b/scripts/docker-integration-tests/replication/m3dbnode-cluster-a.yml new file mode 100644 index 0000000000..aca38c3af9 --- /dev/null +++ b/scripts/docker-integration-tests/replication/m3dbnode-cluster-a.yml @@ -0,0 +1,103 @@ +db: + logging: + level: info + + tracing: + backend: jaeger + jaeger: + reporter: + localAgentHostPort: jaeger:6831 + sampler: + type: const + param: 1 + + metrics: + prometheus: + handlerPath: /metrics + sanitization: prometheus + samplingRate: 1.0 + extended: detailed + + listenAddress: 0.0.0.0:9000 + clusterListenAddress: 0.0.0.0:9001 + httpNodeListenAddress: 0.0.0.0:9002 + httpClusterListenAddress: 0.0.0.0:9003 + debugListenAddress: 0.0.0.0:9004 + + hostID: + resolver: environment + envVarName: M3DB_HOST_ID + + client: + writeConsistencyLevel: majority + readConsistencyLevel: unstrict_majority + + gcPercentage: 100 + + writeNewSeriesAsync: true + writeNewSeriesLimitPerSecond: 1048576 + writeNewSeriesBackoffDuration: 2ms + + bootstrap: + # Intentionally disable peers bootstrapper to ensure it doesn't interfere with test. + bootstrappers: + - filesystem + - commitlog + - uninitialized_topology + commitlog: + returnUnfulfilledForCorruptCommitLogFiles: false + + cache: + series: + policy: lru + postingsList: + size: 262144 + + commitlog: + flushMaxBytes: 524288 + flushEvery: 1s + queue: + calculationType: fixed + size: 2097152 + + fs: + filePathPrefix: /var/lib/m3db + + config: + service: + env: default_env + zone: embedded + service: m3db + cacheDir: /var/lib/m3kv + etcdClusters: + - zone: embedded + endpoints: + - cluster_a_dbnode01:2379 + seedNodes: + initialCluster: + - hostID: cluster_a_m3db_local_1 + endpoint: http://cluster_a_dbnode01:2380 + + # Disable repairs (within cluster a). + repair: + enabled: false + throttle: 1ms + checkInterval: 1ms + + # Enable replication (from cluster b). + replication: + clusters: + - name: "cluster-b" + repairEnabled: true + client: + config: + service: + env: default_env + zone: embedded + service: m3db + cacheDir: /var/lib/m3kv + etcdClusters: + - zone: embedded + endpoints: + - cluster_b_dbnode01:2379 + diff --git a/scripts/docker-integration-tests/replication/m3dbnode-cluster-b.yml b/scripts/docker-integration-tests/replication/m3dbnode-cluster-b.yml new file mode 100644 index 0000000000..558e511304 --- /dev/null +++ b/scripts/docker-integration-tests/replication/m3dbnode-cluster-b.yml @@ -0,0 +1,105 @@ +db: + logging: + level: info + + tracing: + backend: jaeger + jaeger: + reporter: + localAgentHostPort: jaeger:6831 + sampler: + type: const + param: 1 + + metrics: + prometheus: + handlerPath: /metrics + sanitization: prometheus + samplingRate: 1.0 + extended: detailed + + listenAddress: 0.0.0.0:9000 + clusterListenAddress: 0.0.0.0:9001 + httpNodeListenAddress: 0.0.0.0:9002 + httpClusterListenAddress: 0.0.0.0:9003 + debugListenAddress: 0.0.0.0:9004 + + hostID: + resolver: environment + envVarName: M3DB_HOST_ID + + client: + writeConsistencyLevel: majority + readConsistencyLevel: unstrict_majority + + gcPercentage: 100 + + writeNewSeriesAsync: true + writeNewSeriesLimitPerSecond: 1048576 + writeNewSeriesBackoffDuration: 2ms + + bootstrap: + # Intentionally disable peers bootstrapper to ensure it doesn't interfere with test. + bootstrappers: + - filesystem + - commitlog + - uninitialized_topology + commitlog: + returnUnfulfilledForCorruptCommitLogFiles: false + + cache: + series: + policy: lru + postingsList: + size: 262144 + + commitlog: + flushMaxBytes: 524288 + flushEvery: 1s + queue: + calculationType: fixed + size: 2097152 + + fs: + filePathPrefix: /var/lib/m3db + + config: + service: + env: default_env + zone: embedded + service: m3db + cacheDir: /var/lib/m3kv + etcdClusters: + - zone: embedded + endpoints: + - cluster_b_dbnode01:2379 + seedNodes: + initialCluster: + - hostID: cluster_b_m3db_local_1 + endpoint: http://cluster_b_dbnode01:2380 + + # Disable repairs (within cluster b). + repair: + enabled: false + # Repair settings still impact replication even if repairs + # within the cluster are disabled. + throttle: 1ms + checkInterval: 1ms + + # Enable replication (from cluster a). + replication: + clusters: + - name: "cluster-a" + repairEnabled: true + client: + config: + service: + env: default_env + zone: embedded + service: m3db + cacheDir: /var/lib/m3kv + etcdClusters: + - zone: embedded + endpoints: + - cluster_a_dbnode01:2379 + diff --git a/scripts/docker-integration-tests/replication/test.sh b/scripts/docker-integration-tests/replication/test.sh new file mode 100755 index 0000000000..9cf96fa82a --- /dev/null +++ b/scripts/docker-integration-tests/replication/test.sh @@ -0,0 +1,108 @@ +#!/usr/bin/env bash + +set -xe + +source $GOPATH/src/github.com/m3db/m3/scripts/docker-integration-tests/common.sh +REVISION=$(git rev-parse HEAD) +SCRIPT_PATH=$GOPATH/src/github.com/m3db/m3/scripts/docker-integration-tests/replication +COMPOSE_FILE=$SCRIPT_PATH/docker-compose.yml +export REVISION + +echo "Run m3dbnode and m3coordinator containers" +docker-compose -f ${COMPOSE_FILE} up -d --renew-anon-volumes cluster_a_dbnode01 +docker-compose -f ${COMPOSE_FILE} up -d --renew-anon-volumes cluster_a_dbnode02 +docker-compose -f ${COMPOSE_FILE} up -d --renew-anon-volumes cluster_a_coordinator01 + +docker-compose -f ${COMPOSE_FILE} up -d --renew-anon-volumes cluster_b_dbnode01 +docker-compose -f ${COMPOSE_FILE} up -d --renew-anon-volumes cluster_b_dbnode02 +docker-compose -f ${COMPOSE_FILE} up -d --renew-anon-volumes cluster_b_coordinator01 + +# Think of this as a defer func() in golang +function defer { + docker-compose -f ${COMPOSE_FILE} down || echo "unable to shutdown containers" # CI fails to stop all containers sometimes +} +trap defer EXIT + +# Setup cluster A. +DBNODE_ID_01=cluster_a_m3db_local_1 \ +DBNODE_ID_02=cluster_a_m3db_local_2 \ +DBNODE_HOST_01=cluster_a_dbnode01 \ +DBNODE_HOST_02=cluster_a_dbnode02 \ +DBNODE_HEALTH_PORT_01=9012 \ +DBNODE_HEALTH_PORT_02=9022 \ +COORDINATOR_PORT=7201 \ + setup_two_m3db_nodes + +# Setup cluster B. +DBNODE_ID_01=cluster_b_m3db_local_1 \ +DBNODE_ID_02=cluster_b_m3db_local_2 \ +DBNODE_HOST_01=cluster_b_dbnode01 \ +DBNODE_HOST_02=cluster_b_dbnode02 \ +DBNODE_HEALTH_PORT_01=9112 \ +DBNODE_HEALTH_PORT_02=9122 \ +COORDINATOR_PORT=17201 \ + setup_two_m3db_nodes + +function write_data { + namespace=$1 + id=$2 + timestamp=$3 + value=$4 + port=$5 + + respCode=$(curl -s -o /dev/null -X POST -w "%{http_code}" 0.0.0.0:"$port"/write -d '{ + "namespace": "'"$namespace"'", + "id": "'"$id"'", + "datapoint": { + "timestamp":'"$timestamp"', + "value": '"$value"' + } + }') + + + if [[ $respCode -eq "200" ]]; then + return 0 + else + return 1 + fi +} + +function read_all { + namespace=$1 + id=$2 + expected_datapoints=$3 + port=$4 + + received_datapoints=$(curl -sSf -X POST 0.0.0.0:"$port"/fetch -d '{ + "namespace": "'"$namespace"'", + "id": "'"$id"'", + "rangeStart": 0, + "rangeEnd":'"$(date +"%s")"' + }' | jq '.datapoints | length') + + if [[ $expected_datapoints -eq $received_datapoints ]]; then + return 0 + else + return 1 + fi +} + +# Write 2 block sizes into the past to ensure it's a repairable block since the current mutable +# block will not be repaired. Use the node-specific port to ensure the write only goes to dbnode01 +# and not the other two nodes. +echo "Write data for 'now - 2 * blockSize' to cluster_a_dbnode01" +write_data "coldWritesRepairAndNoIndex" "foo" "$(($(date +"%s") - 60 * 60 * 2))" 12.3456789 9012 + +# This should pass immediately since it was written to this node. +echo "Expect to read the data back from dbnode01" +read_all "coldWritesRepairAndNoIndex" "foo" 1 9012 + +# These two should eventually succeed once the replication feature detects +# the mismatch and repairs the data across the clusters. +echo "Wait for the data to become available (via replication) from cluster_b_dbnode01" +ATTEMPTS=30 MAX_TIMEOUT=4 TIMEOUT=1 retry_with_backoff \ + read_all "coldWritesRepairAndNoIndex" "foo" 1 9112 + +echo "Wait for the data to become available (via replication) from cluster_b_dbnode02" +ATTEMPTS=30 MAX_TIMEOUT=4 TIMEOUT=1 retry_with_backoff \ + read_all "coldWritesRepairAndNoIndex" "foo" 1 9122 diff --git a/scripts/docker-integration-tests/run.sh b/scripts/docker-integration-tests/run.sh new file mode 100755 index 0000000000..45d52aa760 --- /dev/null +++ b/scripts/docker-integration-tests/run.sh @@ -0,0 +1,31 @@ +#!/usr/bin/env bash + +set -e + +TESTS=( + scripts/docker-integration-tests/simple/test.sh + scripts/docker-integration-tests/cold_writes_simple/test.sh + scripts/docker-integration-tests/prometheus/test.sh + scripts/docker-integration-tests/carbon/test.sh + scripts/docker-integration-tests/aggregator/test.sh + scripts/docker-integration-tests/query_fanout/test.sh + scripts/docker-integration-tests/repair/test.sh + scripts/docker-integration-tests/replication/test.sh + scripts/docker-integration-tests/repair_and_replication/test.sh +) + +scripts/docker-integration-tests/setup.sh + +NUM_TESTS=${#TESTS[@]} +MIN_IDX=$((NUM_TESTS*BUILDKITE_PARALLEL_JOB/BUILDKITE_PARALLEL_JOB_COUNT)) +MAX_IDX=$(((NUM_TESTS*(BUILDKITE_PARALLEL_JOB+1)/BUILDKITE_PARALLEL_JOB_COUNT)-1)) + +ITER=0 +for test in "${TESTS[@]}"; do + if [[ $ITER -ge $MIN_IDX && $ITER -le $MAX_IDX ]]; then + echo "----------------------------------------------" + echo "running $test" + "$test" + fi + ITER="$((ITER+1))" +done \ No newline at end of file diff --git a/src/cmd/services/m3dbnode/config/config.go b/src/cmd/services/m3dbnode/config/config.go index 700575c0cf..1f8f8f8e80 100644 --- a/src/cmd/services/m3dbnode/config/config.go +++ b/src/cmd/services/m3dbnode/config/config.go @@ -130,9 +130,12 @@ type DBConfiguration struct { // The commit log policy for the node. CommitLog CommitLogPolicy `yaml:"commitlog"` - // The repair policy for repairing in-memory data. + // The repair policy for repairing data within a cluster. Repair *RepairPolicy `yaml:"repair"` + // The replication policy for replicating data between clusters. + Replication *ReplicationPolicy `yaml:"replication"` + // The pooling policy. PoolingPolicy PoolingPolicy `yaml:"pooling"` @@ -179,6 +182,12 @@ func (c *DBConfiguration) InitDefaultsAndValidate() error { return err } + if c.Replication != nil { + if err := c.Replication.Validate(); err != nil { + return err + } + } + return nil } @@ -217,6 +226,7 @@ type TransformConfiguration struct { ForcedValue *float64 `yaml:"forceValue"` } +// Validate validates the transform configuration. func (c *TransformConfiguration) Validate() error { if c == nil { return nil @@ -306,10 +316,10 @@ type RepairPolicy struct { Enabled bool `yaml:"enabled"` // The repair throttle. - Throttle time.Duration `yaml:"throttle" validate:"nonzero"` + Throttle time.Duration `yaml:"throttle"` // The repair check interval. - CheckInterval time.Duration `yaml:"checkInterval" validate:"nonzero"` + CheckInterval time.Duration `yaml:"checkInterval"` // Whether debug shadow comparisons are enabled. DebugShadowComparisonsEnabled bool `yaml:"debugShadowComparisonsEnabled"` @@ -319,6 +329,51 @@ type RepairPolicy struct { DebugShadowComparisonsPercentage float64 `yaml:"debugShadowComparisonsPercentage"` } +// ReplicationPolicy is the replication policy. +type ReplicationPolicy struct { + Clusters []ReplicatedCluster `yaml:"clusters"` +} + +// Validate validates the replication policy. +func (r *ReplicationPolicy) Validate() error { + names := map[string]bool{} + for _, c := range r.Clusters { + if err := c.Validate(); err != nil { + return err + } + + if _, ok := names[c.Name]; ok { + return fmt.Errorf( + "replicated cluster names must be unique, but %s was repeated", + c.Name) + } + names[c.Name] = true + } + + return nil +} + +// ReplicatedCluster defines a cluster to replicate data from. +type ReplicatedCluster struct { + Name string `yaml:"name"` + RepairEnabled bool `yaml:"repairEnabled"` + Client *client.Configuration `yaml:"client"` +} + +// Validate validates the configuration for a replicated cluster. +func (r *ReplicatedCluster) Validate() error { + if r.Name == "" { + return errors.New("replicated cluster must be assigned a name") + } + + if r.RepairEnabled && r.Client == nil { + return fmt.Errorf( + "replicated cluster: %s has repair enabled but not client configuration", r.Name) + } + + return nil +} + // HashingConfiguration is the configuration for hashing. type HashingConfiguration struct { // Murmur32 seed value. @@ -332,6 +387,7 @@ type ProtoConfiguration struct { SchemaRegistry map[string]NamespaceProtoSchema `yaml:"schema_registry"` } +// NamespaceProtoSchema is the namespace protobuf schema. type NamespaceProtoSchema struct { // For application m3db client integration test convenience (where a local dbnode is started as a docker container), // we allow loading user schema from local file into schema registry. diff --git a/src/cmd/services/m3dbnode/config/config_test.go b/src/cmd/services/m3dbnode/config/config_test.go index 3b661eda97..cf4e92364a 100644 --- a/src/cmd/services/m3dbnode/config/config_test.go +++ b/src/cmd/services/m3dbnode/config/config_test.go @@ -442,6 +442,7 @@ func TestConfiguration(t *testing.T) { checkInterval: 1m0s debugShadowComparisonsEnabled: false debugShadowComparisonsPercentage: 0 + replication: null pooling: blockAllocSize: 16 type: simple diff --git a/src/dbnode/integration/integration.go b/src/dbnode/integration/integration.go index f8a79a22c1..e0d61073bb 100644 --- a/src/dbnode/integration/integration.go +++ b/src/dbnode/integration/integration.go @@ -301,7 +301,7 @@ func newDefaultBootstrappableTestSetups( setup.storageOpts.RepairOptions(). SetRepairThrottle(time.Millisecond). SetRepairCheckInterval(time.Millisecond). - SetAdminClient(adminClient). + SetAdminClients([]client.AdminClient{adminClient}). SetDebugShadowComparisonsPercentage(1.0). // Avoid log spam. SetDebugShadowComparisonsEnabled(false)) diff --git a/src/dbnode/integration/setup.go b/src/dbnode/integration/setup.go index edfd0037a4..f16474b2eb 100644 --- a/src/dbnode/integration/setup.go +++ b/src/dbnode/integration/setup.go @@ -333,7 +333,9 @@ func newTestSetup(t *testing.T, opts testOptions, fsOpts fs.Options) (*testSetup storageOpts = storageOpts.SetPersistManager(pm) // Set up repair options - storageOpts = storageOpts.SetRepairOptions(storageOpts.RepairOptions().SetAdminClient(adminClient)) + storageOpts = storageOpts. + SetRepairOptions(storageOpts.RepairOptions(). + SetAdminClients([]client.AdminClient{adminClient})) // Set up block retriever manager if mgr := opts.DatabaseBlockRetrieverManager(); mgr != nil { diff --git a/src/dbnode/server/server.go b/src/dbnode/server/server.go index edbbb132c9..e2930b5628 100644 --- a/src/dbnode/server/server.go +++ b/src/dbnode/server/server.go @@ -625,33 +625,9 @@ func Run(runOpts RunOptions) { } origin := topology.NewHost(hostID, "") - m3dbClient, err := cfg.Client.NewAdminClient( - client.ConfigurationParameters{ - InstrumentOptions: iopts. - SetMetricsScope(iopts.MetricsScope().SubScope("m3dbclient")), - TopologyInitializer: envCfg.TopologyInitializer, - }, - func(opts client.AdminOptions) client.AdminOptions { - return opts.SetRuntimeOptionsManager(runtimeOptsMgr).(client.AdminOptions) - }, - func(opts client.AdminOptions) client.AdminOptions { - return opts.SetContextPool(opts.ContextPool()).(client.AdminOptions) - }, - func(opts client.AdminOptions) client.AdminOptions { - return opts.SetOrigin(origin) - }, - func(opts client.AdminOptions) client.AdminOptions { - if cfg.Proto != nil && cfg.Proto.Enabled { - return opts.SetEncodingProto( - encoding.NewOptions(), - ).(client.AdminOptions) - } - return opts - }, - func(opts client.AdminOptions) client.AdminOptions { - return opts.SetSchemaRegistry(schemaRegistry) - }, - ) + m3dbClient, err := newAdminClient( + cfg.Client, iopts, envCfg.TopologyInitializer, runtimeOptsMgr, + origin, protoEnabled, schemaRegistry, envCfg.KVStore, logger) if err != nil { logger.Fatal("could not create m3db client", zap.Error(err)) } @@ -660,11 +636,6 @@ func Run(runOpts RunOptions) { runOpts.ClientCh <- m3dbClient } - // Kick off runtime options manager KV watches - clientAdminOpts := m3dbClient.Options().(client.AdminOptions) - kvWatchClientConsistencyLevels(envCfg.KVStore, logger, - clientAdminOpts, runtimeOptsMgr) - mutableSegmentAlloc := index.NewBootstrapResultMutableSegmentAllocator( opts.IndexOptions()) rsOpts := result.NewOptions(). @@ -673,23 +644,60 @@ func Run(runOpts RunOptions) { SetSeriesCachePolicy(opts.SeriesCachePolicy()). SetIndexMutableSegmentAllocator(mutableSegmentAlloc) - opts = opts.SetRepairEnabled(false) - if cfg.Repair != nil { + var repairClients []client.AdminClient + if cfg.Repair != nil && cfg.Repair.Enabled { + repairClients = append(repairClients, m3dbClient) + } + if cfg.Replication != nil { + for _, cluster := range cfg.Replication.Clusters { + if !cluster.RepairEnabled { + continue + } + + // Pass nil for the topology initializer because we want to create + // a new one for the cluster we wish to replicate from, not use the + // same one as the cluster this node belongs to. + var topologyInitializer topology.Initializer + // Guaranteed to not be nil if repair is enabled by config validation. + clientCfg := *cluster.Client + clusterClient, err := newAdminClient( + clientCfg, iopts, topologyInitializer, runtimeOptsMgr, + origin, protoEnabled, schemaRegistry, envCfg.KVStore, logger) + if err != nil { + logger.Fatal( + "unable to create client for replicated cluster: %s", + zap.String("clusterName", cluster.Name)) + } + repairClients = append(repairClients, clusterClient) + } + } + repairEnabled := len(repairClients) > 0 + if repairEnabled { repairOpts := opts.RepairOptions(). - SetRepairThrottle(cfg.Repair.Throttle). - SetRepairCheckInterval(cfg.Repair.CheckInterval). - SetAdminClient(m3dbClient). - SetResultOptions(rsOpts). - SetDebugShadowComparisonsEnabled(cfg.Repair.DebugShadowComparisonsEnabled) - - if cfg.Repair.DebugShadowComparisonsPercentage > 0 { - // Set conditionally to avoid stomping on the default value of 1.0. - repairOpts = repairOpts.SetDebugShadowComparisonsPercentage(cfg.Repair.DebugShadowComparisonsPercentage) + SetAdminClients(repairClients) + + if cfg.Repair != nil { + repairOpts = repairOpts. + SetResultOptions(rsOpts). + SetDebugShadowComparisonsEnabled(cfg.Repair.DebugShadowComparisonsEnabled) + if cfg.Repair.Throttle > 0 { + repairOpts = repairOpts.SetRepairThrottle(cfg.Repair.Throttle) + } + if cfg.Repair.CheckInterval > 0 { + repairOpts = repairOpts.SetRepairCheckInterval(cfg.Repair.CheckInterval) + } + + if cfg.Repair.DebugShadowComparisonsPercentage > 0 { + // Set conditionally to avoid stomping on the default value of 1.0. + repairOpts = repairOpts.SetDebugShadowComparisonsPercentage(cfg.Repair.DebugShadowComparisonsPercentage) + } } opts = opts. - SetRepairEnabled(cfg.Repair.Enabled). + SetRepairEnabled(true). SetRepairOptions(repairOpts) + } else { + opts = opts.SetRepairEnabled(false) } // Set bootstrap options - We need to create a topology map provider from the @@ -1442,6 +1450,55 @@ func withEncodingAndPoolingOptions( return opts.SetIndexOptions(indexOpts) } +func newAdminClient( + config client.Configuration, + iopts instrument.Options, + topologyInitializer topology.Initializer, + runtimeOptsMgr m3dbruntime.OptionsManager, + origin topology.Host, + protoEnabled bool, + schemaRegistry namespace.SchemaRegistry, + kvStore kv.Store, + logger *zap.Logger, +) (client.AdminClient, error) { + m3dbClient, err := config.NewAdminClient( + client.ConfigurationParameters{ + InstrumentOptions: iopts. + SetMetricsScope(iopts.MetricsScope().SubScope("m3dbclient")), + TopologyInitializer: topologyInitializer, + }, + func(opts client.AdminOptions) client.AdminOptions { + return opts.SetRuntimeOptionsManager(runtimeOptsMgr).(client.AdminOptions) + }, + func(opts client.AdminOptions) client.AdminOptions { + return opts.SetContextPool(opts.ContextPool()).(client.AdminOptions) + }, + func(opts client.AdminOptions) client.AdminOptions { + return opts.SetOrigin(origin) + }, + func(opts client.AdminOptions) client.AdminOptions { + if protoEnabled { + return opts.SetEncodingProto( + encoding.NewOptions(), + ).(client.AdminOptions) + } + return opts + }, + func(opts client.AdminOptions) client.AdminOptions { + return opts.SetSchemaRegistry(schemaRegistry) + }, + ) + if err != nil { + return nil, err + } + + // Kick off runtime options manager KV watches. + clientAdminOpts := m3dbClient.Options().(client.AdminOptions) + kvWatchClientConsistencyLevels(kvStore, logger, + clientAdminOpts, runtimeOptsMgr) + return m3dbClient, nil +} + func poolOptions( policy config.PoolPolicy, scope tally.Scope, diff --git a/src/dbnode/storage/database_test.go b/src/dbnode/storage/database_test.go index 69d6c3d684..987586ac1e 100644 --- a/src/dbnode/storage/database_test.go +++ b/src/dbnode/storage/database_test.go @@ -39,6 +39,7 @@ import ( dberrors "github.com/m3db/m3/src/dbnode/storage/errors" "github.com/m3db/m3/src/dbnode/storage/index" "github.com/m3db/m3/src/dbnode/storage/repair" + "github.com/m3db/m3/src/dbnode/topology" "github.com/m3db/m3/src/dbnode/tracepoint" "github.com/m3db/m3/src/dbnode/ts" xmetrics "github.com/m3db/m3/src/dbnode/x/metrics" @@ -70,6 +71,7 @@ var ( defaultTestNs1Opts = namespace.NewOptions().SetRetentionOptions(defaultTestRetentionOpts) defaultTestNs2Opts = namespace.NewOptions().SetRetentionOptions(defaultTestNs2RetentionOpts) testSchemaHistory = prototest.NewSchemaHistory() + testClientOptions = client.NewOptions() ) type nsMapCh chan namespace.Map @@ -126,8 +128,14 @@ func testNamespaceMap(t *testing.T) namespace.Map { } func testRepairOptions(ctrl *gomock.Controller) repair.Options { + var ( + origin = topology.NewHost("some-id", "some-address") + clientOpts = testClientOptions.(client.AdminOptions).SetOrigin(origin) + mockClient = client.NewMockAdminClient(ctrl) + ) + mockClient.EXPECT().Options().Return(clientOpts).AnyTimes() return repair.NewOptions(). - SetAdminClient(client.NewMockAdminClient(ctrl)). + SetAdminClients([]client.AdminClient{mockClient}). SetRepairCheckInterval(100 * time.Millisecond) } diff --git a/src/dbnode/storage/repair.go b/src/dbnode/storage/repair.go index a8d483239a..36ece5bd78 100644 --- a/src/dbnode/storage/repair.go +++ b/src/dbnode/storage/repair.go @@ -37,6 +37,7 @@ import ( "github.com/m3db/m3/src/dbnode/storage/block" "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" "github.com/m3db/m3/src/dbnode/storage/repair" + "github.com/m3db/m3/src/dbnode/topology" "github.com/m3db/m3/src/dbnode/x/xio" "github.com/m3db/m3/src/x/context" "github.com/m3db/m3/src/x/dice" @@ -61,7 +62,7 @@ type recordFn func(namespace ident.ID, shard databaseShard, diffRes repair.Metad type shardRepairer struct { opts Options rpopts repair.Options - client client.AdminClient + clients []client.AdminClient recordFn recordFn logger *zap.Logger scope tally.Scope @@ -73,12 +74,12 @@ func newShardRepairer(opts Options, rpopts repair.Options) databaseShardRepairer scope := iopts.MetricsScope().SubScope("repair") r := shardRepairer{ - opts: opts, - rpopts: rpopts, - client: rpopts.AdminClient(), - logger: iopts.Logger(), - scope: scope, - nowFn: opts.ClockOptions().NowFn(), + opts: opts, + rpopts: rpopts, + clients: rpopts.AdminClients(), + logger: iopts.Logger(), + scope: scope, + nowFn: opts.ClockOptions().NowFn(), } r.recordFn = r.recordDifferences @@ -96,15 +97,31 @@ func (r shardRepairer) Repair( tr xtime.Range, shard databaseShard, ) (repair.MetadataComparisonResult, error) { - session, err := r.client.DefaultAdminSession() - if err != nil { - return repair.MetadataComparisonResult{}, err + var sessions []sessionAndTopo + for _, c := range r.clients { + session, err := c.DefaultAdminSession() + if err != nil { + fmtErr := fmt.Errorf("error obtaining default admin session: %v", err) + return repair.MetadataComparisonResult{}, fmtErr + } + topo, err := session.TopologyMap() + if err != nil { + fmtErr := fmt.Errorf("error obtaining topology map: %v", err) + return repair.MetadataComparisonResult{}, fmtErr + } + + sessions = append(sessions, sessionAndTopo{ + session: session, + topo: topo, + }) } var ( - start = tr.Start - end = tr.End - origin = session.Origin() + start = tr.Start + end = tr.End + // Guaranteed to have at least one session and all should have an identical + // origin (both assumptions guaranteed by options validation). + origin = sessions[0].session.Origin() ) metadata := repair.NewReplicaMetadataComparer(origin, r.rpopts) @@ -118,6 +135,7 @@ func (r shardRepairer) Repair( var ( accumLocalMetadata = block.NewFetchBlocksMetadataResults() pageToken PageToken + err error ) // Safe to register since by the time this function completes we won't be using the metadata // for anything anymore. @@ -147,13 +165,15 @@ func (r shardRepairer) Repair( } if r.rpopts.DebugShadowComparisonsEnabled() { - // Shadow comparison is mostly a debug feature that can be used to test new builds and diagnose - // issues with the repair feature. It should not be enabled for production use-cases. - err := r.shadowCompare(start, end, accumLocalMetadata, session, shard, nsCtx) - if err != nil { - r.logger.Error( - "Shadow compare failed", - zap.Error(err)) + for _, sesTopo := range sessions { + // Shadow comparison is mostly a debug feature that can be used to test new builds and diagnose + // issues with the repair feature. It should not be enabled for production use-cases. + err := r.shadowCompare(start, end, accumLocalMetadata, sesTopo.session, shard, nsCtx) + if err != nil { + r.logger.Error( + "Shadow compare failed", + zap.Error(err)) + } } } @@ -163,25 +183,30 @@ func (r shardRepairer) Repair( return repair.MetadataComparisonResult{}, err } - rsOpts := r.opts.RepairOptions().ResultOptions() - // Add peer metadata. - level := r.rpopts.RepairConsistencyLevel() - peerIter, err := session.FetchBlocksMetadataFromPeers(nsCtx.ID, shard.ID(), start, end, - level, rsOpts) - if err != nil { - return repair.MetadataComparisonResult{}, err - } - if err := metadata.AddPeerMetadata(peerIter); err != nil { - return repair.MetadataComparisonResult{}, err + var ( + rsOpts = r.opts.RepairOptions().ResultOptions() + level = r.rpopts.RepairConsistencyLevel() + ) + for _, sesTopo := range sessions { + // Add peer metadata. + peerIter, err := sesTopo.session.FetchBlocksMetadataFromPeers(nsCtx.ID, shard.ID(), start, end, + level, rsOpts) + if err != nil { + return repair.MetadataComparisonResult{}, err + } + if err := metadata.AddPeerMetadata(peerIter); err != nil { + return repair.MetadataComparisonResult{}, err + } } var ( - // TODO(rartoul): Pool this slice. - metadatasToFetchBlocksFor = []block.ReplicaMetadata{} - metadataRes = metadata.Compare() - seriesWithChecksumMismatches = metadataRes.ChecksumDifferences.Series() + // TODO(rartoul): Pool these slices. + metadatasToFetchBlocksForPerSession = make([][]block.ReplicaMetadata, len(sessions)) + metadataRes = metadata.Compare() + seriesWithChecksumMismatches = metadataRes.ChecksumDifferences.Series() ) + originID := origin.ID() for _, e := range seriesWithChecksumMismatches.Iter() { for blockStart, replicaMetadataBlocks := range e.Value().Metadata.Blocks() { blStartTime := blockStart.ToTime() @@ -198,33 +223,70 @@ func (r shardRepairer) Repair( } for _, replicaMetadata := range replicaMetadataBlocks.Metadata() { - if replicaMetadata.Host.ID() == session.Origin().ID() { + metadataHostID := replicaMetadata.Host.ID() + if metadataHostID == originID { // Don't request blocks for self metadata. continue } - metadatasToFetchBlocksFor = append(metadatasToFetchBlocksFor, replicaMetadata) + + if len(sessions) == 1 { + // Optimized path for single session case. + metadatasToFetchBlocksForPerSession[0] = append(metadatasToFetchBlocksForPerSession[0], replicaMetadata) + continue + } + + // If there is more than one session then we need to match up all of the metadata to the + // session it belongs to so that we can fetch the corresponding blocks of data. + foundSessionForMetadata := false + for i, sesTopo := range sessions { + _, ok := sesTopo.topo.LookupHostShardSet(metadataHostID) + if !ok { + // The host this metadata came from is not part of the cluster this session is connected to. + continue + } + metadatasToFetchBlocksForPerSession[i] = append(metadatasToFetchBlocksForPerSession[i], replicaMetadata) + foundSessionForMetadata = true + break + } + + if !foundSessionForMetadata { + // Could happen during topology changes (I.E node is kicked out of the cluster in-between + // fetching its metadata and this step). + r.logger.Debug( + "could not identify which session mismatched metadata belong to", + zap.String("hostID", metadataHostID), + zap.Time("blockStart", blStartTime), + ) + } } } } - perSeriesReplicaIter, err := session.FetchBlocksFromPeers(nsMeta, shard.ID(), level, metadatasToFetchBlocksFor, rsOpts) - if err != nil { - return repair.MetadataComparisonResult{}, err - } - // TODO(rartoul): Copying the IDs for the purposes of the map key is wasteful. Considering using // SetUnsafe or marking as NoFinalize() and making the map check IsNoFinalize(). numMismatchSeries := seriesWithChecksumMismatches.Len() results := result.NewShardResult(numMismatchSeries, rsOpts) - for perSeriesReplicaIter.Next() { - _, id, block := perSeriesReplicaIter.Current() - // TODO(rartoul): Handle tags in both branches: https://github.com/m3db/m3/issues/1848 - if existing, ok := results.BlockAt(id, block.StartTime()); ok { - if err := existing.Merge(block); err != nil { - return repair.MetadataComparisonResult{}, err + for i, metadatasToFetchBlocksFor := range metadatasToFetchBlocksForPerSession { + if len(metadatasToFetchBlocksFor) == 0 { + continue + } + + session := sessions[i].session + perSeriesReplicaIter, err := session.FetchBlocksFromPeers(nsMeta, shard.ID(), level, metadatasToFetchBlocksFor, rsOpts) + if err != nil { + return repair.MetadataComparisonResult{}, err + } + + for perSeriesReplicaIter.Next() { + _, id, block := perSeriesReplicaIter.Current() + // TODO(rartoul): Handle tags in both branches: https://github.com/m3db/m3/issues/1848 + if existing, ok := results.BlockAt(id, block.StartTime()); ok { + if err := existing.Merge(block); err != nil { + return repair.MetadataComparisonResult{}, err + } + } else { + results.AddBlock(id, ident.Tags{}, block) } - } else { - results.AddBlock(id, ident.Tags{}, block) } } @@ -762,3 +824,8 @@ func (r shardRepairer) shadowCompare( return nil } + +type sessionAndTopo struct { + session client.AdminSession + topo topology.Map +} diff --git a/src/dbnode/storage/repair/options.go b/src/dbnode/storage/repair/options.go index f3ba6e4dde..3555f6eabf 100644 --- a/src/dbnode/storage/repair/options.go +++ b/src/dbnode/storage/repair/options.go @@ -22,6 +22,7 @@ package repair import ( "errors" + "fmt" "time" "github.com/m3db/m3/src/dbnode/client" @@ -50,7 +51,7 @@ var ( ) type options struct { - adminClient client.AdminClient + adminClients []client.AdminClient repairConsistencyLevel topology.ReadConsistencyLevel repairShardConcurrency int repairCheckInterval time.Duration @@ -75,14 +76,14 @@ func NewOptions() Options { } } -func (o *options) SetAdminClient(value client.AdminClient) Options { +func (o *options) SetAdminClients(value []client.AdminClient) Options { opts := *o - opts.adminClient = value + opts.adminClients = value return &opts } -func (o *options) AdminClient() client.AdminClient { - return o.adminClient +func (o *options) AdminClients() []client.AdminClient { + return o.adminClients } func (o *options) SetRepairConsistencyLevel(value topology.ReadConsistencyLevel) Options { @@ -166,9 +167,25 @@ func (o *options) DebugShadowComparisonsPercentage() float64 { } func (o *options) Validate() error { - if o.adminClient == nil { + if len(o.adminClients) == 0 { return errNoAdminClient } + + var prevOrigin string + for _, c := range o.adminClients { + currOrigin := c.Options().(client.AdminOptions).Origin().ID() + if prevOrigin == "" { + prevOrigin = currOrigin + continue + } + + if currOrigin != prevOrigin { + return fmt.Errorf( + "all repair clients should have the same origin, prev: %s, curr: %s", + prevOrigin, currOrigin) + } + } + if o.repairCheckInterval < 0 { return errInvalidRepairCheckInterval } diff --git a/src/dbnode/storage/repair/types.go b/src/dbnode/storage/repair/types.go index 73b098a057..3754c3f6ab 100644 --- a/src/dbnode/storage/repair/types.go +++ b/src/dbnode/storage/repair/types.go @@ -145,10 +145,10 @@ type MetadataComparisonResult struct { // Options are the repair options type Options interface { // SetAdminClient sets the admin client. - SetAdminClient(value client.AdminClient) Options + SetAdminClients(value []client.AdminClient) Options // AdminClient returns the admin client. - AdminClient() client.AdminClient + AdminClients() []client.AdminClient // SetRepairConsistencyLevel sets the repair read level consistency // for which to repair shards with. diff --git a/src/dbnode/storage/repair_test.go b/src/dbnode/storage/repair_test.go index 526ef10fa1..662040b5fd 100644 --- a/src/dbnode/storage/repair_test.go +++ b/src/dbnode/storage/repair_test.go @@ -111,12 +111,14 @@ func TestDatabaseShardRepairerRepair(t *testing.T) { session := client.NewMockAdminSession(ctrl) session.EXPECT().Origin().Return(topology.NewHost("0", "addr0")).AnyTimes() + session.EXPECT().TopologyMap() mockClient := client.NewMockAdminClient(ctrl) mockClient.EXPECT().DefaultAdminSession().Return(session, nil) var ( - rpOpts = testRepairOptions(ctrl).SetAdminClient(mockClient) + rpOpts = testRepairOptions(ctrl). + SetAdminClients([]client.AdminClient{mockClient}) now = time.Now() nowFn = func() time.Time { return now } opts = DefaultTestOptions() @@ -285,6 +287,244 @@ func TestDatabaseShardRepairerRepair(t *testing.T) { require.Equal(t, expected, currBlock.Metadata()) } +type multiSessionTestMock struct { + host topology.Host + client *client.MockAdminClient + session *client.MockAdminSession + topoMap *topology.MockMap +} + +func TestDatabaseShardRepairerRepairMultiSession(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + // Origin is always zero (on both clients) and hosts[0] and hosts[1] + // represents other nodes in different clusters. + origin := topology.NewHost("0", "addr0") + mocks := []multiSessionTestMock{ + multiSessionTestMock{ + host: topology.NewHost("1", "addr1"), + client: client.NewMockAdminClient(ctrl), + session: client.NewMockAdminSession(ctrl), + topoMap: topology.NewMockMap(ctrl), + }, + multiSessionTestMock{ + host: topology.NewHost("2", "addr2"), + client: client.NewMockAdminClient(ctrl), + session: client.NewMockAdminSession(ctrl), + topoMap: topology.NewMockMap(ctrl), + }, + } + + var mockClients []client.AdminClient + var hosts []topology.Host + for _, mock := range mocks { + mock.session.EXPECT().Origin().Return(origin).AnyTimes() + mock.client.EXPECT().DefaultAdminSession().Return(mock.session, nil) + mock.session.EXPECT().TopologyMap().Return(mock.topoMap, nil) + mockClients = append(mockClients, mock.client) + hosts = append(hosts, mock.host) + } + + var ( + rpOpts = testRepairOptions(ctrl). + SetAdminClients(mockClients) + now = time.Now() + nowFn = func() time.Time { return now } + opts = DefaultTestOptions() + copts = opts.ClockOptions() + iopts = opts.InstrumentOptions() + rtopts = defaultTestRetentionOpts + ) + + opts = opts. + SetClockOptions(copts.SetNowFn(nowFn)). + SetInstrumentOptions(iopts.SetMetricsScope(tally.NoopScope)) + + var ( + namespaceID = ident.StringID("testNamespace") + start = now + end = now.Add(rtopts.BlockSize()) + repairTimeRange = xtime.Range{Start: start, End: end} + fetchOpts = block.FetchBlocksMetadataOptions{ + IncludeSizes: true, + IncludeChecksums: true, + IncludeLastRead: false, + } + + sizes = []int64{1, 2, 3, 4} + checksums = []uint32{4, 5, 6, 7} + lastRead = now.Add(-time.Minute) + shardID = uint32(0) + shard = NewMockdatabaseShard(ctrl) + ) + + expectedResults := block.NewFetchBlocksMetadataResults() + results := block.NewFetchBlockMetadataResults() + results.Add(block.NewFetchBlockMetadataResult(now.Add(30*time.Minute), + sizes[0], &checksums[0], lastRead, nil)) + results.Add(block.NewFetchBlockMetadataResult(now.Add(time.Hour), + sizes[1], &checksums[1], lastRead, nil)) + expectedResults.Add(block.NewFetchBlocksMetadataResult(ident.StringID("foo"), nil, results)) + results = block.NewFetchBlockMetadataResults() + results.Add(block.NewFetchBlockMetadataResult(now.Add(30*time.Minute), + sizes[2], &checksums[2], lastRead, nil)) + expectedResults.Add(block.NewFetchBlocksMetadataResult(ident.StringID("bar"), nil, results)) + + var ( + any = gomock.Any() + nonNilPageToken = PageToken("non-nil-page-token") + ) + // Ensure that the Repair logic will call FetchBlocksMetadataV2 in a loop until + // it receives a nil page token. + shard.EXPECT(). + FetchBlocksMetadataV2(any, start, end, any, nil, fetchOpts). + Return(nil, nonNilPageToken, nil) + shard.EXPECT(). + FetchBlocksMetadataV2(any, start, end, any, nonNilPageToken, fetchOpts). + Return(expectedResults, nil, nil) + shard.EXPECT().ID().Return(shardID).AnyTimes() + shard.EXPECT().Load(gomock.Any()) + + inputBlocks := []block.ReplicaMetadata{ + { + Metadata: block.NewMetadata(ident.StringID("foo"), ident.Tags{}, now.Add(30*time.Minute), sizes[0], &checksums[0], lastRead), + }, + { + Metadata: block.NewMetadata(ident.StringID("foo"), ident.Tags{}, now.Add(time.Hour), sizes[0], &checksums[1], lastRead), + }, + { + // Mismatch checksum so should trigger repair of this series. + Metadata: block.NewMetadata(ident.StringID("bar"), ident.Tags{}, now.Add(30*time.Minute), sizes[2], &checksums[3], lastRead), + }, + } + + for i, mock := range mocks { + mockTopoMap := mock.topoMap + for _, host := range hosts { + iClosure := i + mockTopoMap.EXPECT().LookupHostShardSet(host.ID()).DoAndReturn(func(id string) (topology.HostShardSet, bool) { + if iClosure == 0 && id == hosts[0].ID() { + return nil, true + } + if iClosure == 1 && id == hosts[1].ID() { + return nil, true + } + return nil, false + }).AnyTimes() + } + } + + nsMeta, err := namespace.NewMetadata(namespaceID, namespace.NewOptions()) + for i, mock := range mocks { + session := mock.session + // Make a copy of the input blocks where the host is set to the host for + // the cluster associated with the current session. + inputBlocksForSession := make([]block.ReplicaMetadata, len(inputBlocks)) + copy(inputBlocksForSession, inputBlocks) + for j := range inputBlocksForSession { + inputBlocksForSession[j].Host = hosts[i] + } + + peerIter := client.NewMockPeerBlockMetadataIter(ctrl) + gomock.InOrder( + peerIter.EXPECT().Next().Return(true), + peerIter.EXPECT().Current().Return(inputBlocksForSession[0].Host, inputBlocks[0].Metadata), + peerIter.EXPECT().Next().Return(true), + peerIter.EXPECT().Current().Return(inputBlocksForSession[1].Host, inputBlocks[1].Metadata), + peerIter.EXPECT().Next().Return(true), + peerIter.EXPECT().Current().Return(inputBlocksForSession[2].Host, inputBlocks[2].Metadata), + peerIter.EXPECT().Next().Return(false), + peerIter.EXPECT().Err().Return(nil), + ) + session.EXPECT(). + FetchBlocksMetadataFromPeers(namespaceID, shardID, start, end, + rpOpts.RepairConsistencyLevel(), gomock.Any()). + Return(peerIter, nil) + + peerBlocksIter := client.NewMockPeerBlocksIter(ctrl) + dbBlock1 := block.NewMockDatabaseBlock(ctrl) + dbBlock1.EXPECT().StartTime().Return(inputBlocksForSession[2].Metadata.Start).AnyTimes() + dbBlock2 := block.NewMockDatabaseBlock(ctrl) + dbBlock2.EXPECT().StartTime().Return(inputBlocksForSession[2].Metadata.Start).AnyTimes() + // Ensure merging logic works. Nede AnyTimes() because the Merge() will only be called on dbBlock1 + // for the first session (all subsequent blocks from other sessions will get merged into dbBlock1 + // from the first session.) + dbBlock1.EXPECT().Merge(dbBlock2).AnyTimes() + gomock.InOrder( + peerBlocksIter.EXPECT().Next().Return(true), + peerBlocksIter.EXPECT().Current().Return(inputBlocksForSession[2].Host, inputBlocks[2].Metadata.ID, dbBlock1), + peerBlocksIter.EXPECT().Next().Return(true), + peerBlocksIter.EXPECT().Current().Return(inputBlocksForSession[2].Host, inputBlocks[2].Metadata.ID, dbBlock2), + peerBlocksIter.EXPECT().Next().Return(false), + ) + require.NoError(t, err) + session.EXPECT(). + FetchBlocksFromPeers(nsMeta, shardID, rpOpts.RepairConsistencyLevel(), inputBlocksForSession[2:], gomock.Any()). + Return(peerBlocksIter, nil) + } + + var ( + resNamespace ident.ID + resShard databaseShard + resDiff repair.MetadataComparisonResult + ) + + databaseShardRepairer := newShardRepairer(opts, rpOpts) + repairer := databaseShardRepairer.(shardRepairer) + repairer.recordFn = func(nsID ident.ID, shard databaseShard, diffRes repair.MetadataComparisonResult) { + resNamespace = nsID + resShard = shard + resDiff = diffRes + } + + var ( + ctx = context.NewContext() + nsCtx = namespace.Context{ID: namespaceID} + ) + require.NoError(t, err) + repairer.Repair(ctx, nsCtx, nsMeta, repairTimeRange, shard) + + require.Equal(t, namespaceID, resNamespace) + require.Equal(t, resShard, shard) + require.Equal(t, int64(2), resDiff.NumSeries) + require.Equal(t, int64(3), resDiff.NumBlocks) + + checksumDiffSeries := resDiff.ChecksumDifferences.Series() + require.Equal(t, 1, checksumDiffSeries.Len()) + series, exists := checksumDiffSeries.Get(ident.StringID("bar")) + require.True(t, exists) + blocks := series.Metadata.Blocks() + require.Equal(t, 1, len(blocks)) + currBlock, exists := blocks[xtime.ToUnixNano(now.Add(30*time.Minute))] + require.True(t, exists) + require.Equal(t, now.Add(30*time.Minute), currBlock.Start()) + expected := []block.ReplicaMetadata{ + // Checksum difference for series "bar". + {Host: origin, Metadata: block.NewMetadata(ident.StringID("bar"), ident.Tags{}, now.Add(30*time.Minute), sizes[2], &checksums[2], lastRead)}, + {Host: hosts[0], Metadata: inputBlocks[2].Metadata}, + {Host: hosts[1], Metadata: inputBlocks[2].Metadata}, + } + require.Equal(t, expected, currBlock.Metadata()) + + sizeDiffSeries := resDiff.SizeDifferences.Series() + require.Equal(t, 1, sizeDiffSeries.Len()) + series, exists = sizeDiffSeries.Get(ident.StringID("foo")) + require.True(t, exists) + blocks = series.Metadata.Blocks() + require.Equal(t, 1, len(blocks)) + currBlock, exists = blocks[xtime.ToUnixNano(now.Add(time.Hour))] + require.True(t, exists) + require.Equal(t, now.Add(time.Hour), currBlock.Start()) + expected = []block.ReplicaMetadata{ + // Size difference for series "foo". + {Host: origin, Metadata: block.NewMetadata(ident.StringID("foo"), ident.Tags{}, now.Add(time.Hour), sizes[1], &checksums[1], lastRead)}, + {Host: hosts[0], Metadata: inputBlocks[1].Metadata}, + {Host: hosts[1], Metadata: inputBlocks[1].Metadata}, + } + require.Equal(t, expected, currBlock.Metadata()) +} + type expectedRepair struct { repairRange xtime.Range }