diff --git a/scripts/docker-integration-tests/aggregator/m3coordinator.yml b/scripts/docker-integration-tests/aggregator/m3coordinator.yml index 8d3afe98b6..10ba278c91 100644 --- a/scripts/docker-integration-tests/aggregator/m3coordinator.yml +++ b/scripts/docker-integration-tests/aggregator/m3coordinator.yml @@ -12,15 +12,7 @@ carbon: retention: 6h clusters: - - namespaces: - - namespace: agg - type: aggregated - resolution: 10s - retention: 6h - - namespace: unagg - type: unaggregated - retention: 1s - client: + - client: config: service: env: default_env diff --git a/scripts/docker-integration-tests/aggregator/test.sh b/scripts/docker-integration-tests/aggregator/test.sh index 0c48d2ddd5..8d287df7e0 100755 --- a/scripts/docker-integration-tests/aggregator/test.sh +++ b/scripts/docker-integration-tests/aggregator/test.sh @@ -29,7 +29,7 @@ function defer { trap defer EXIT echo "Setup DB node" -setup_single_m3db_node +AGG_RESOLUTION=10s AGG_RETENTION=6h setup_single_m3db_node echo "Initializing aggregator topology" curl -vvvsSf -X POST -H "Cluster-Environment-Name: override_test_env" localhost:7201/api/v1/services/m3aggregator/placement/init -d '{ diff --git a/scripts/docker-integration-tests/aggregator_legacy/m3coordinator.yml b/scripts/docker-integration-tests/aggregator_legacy/m3coordinator.yml index 3649b73580..acc91e156e 100644 --- a/scripts/docker-integration-tests/aggregator_legacy/m3coordinator.yml +++ b/scripts/docker-integration-tests/aggregator_legacy/m3coordinator.yml @@ -12,15 +12,7 @@ carbon: retention: 6h clusters: - - namespaces: - - namespace: agg - type: aggregated - resolution: 10s - retention: 6h - - namespace: unagg - type: unaggregated - retention: 1s - client: + - client: config: service: env: default_env diff --git a/scripts/docker-integration-tests/aggregator_legacy/test.sh b/scripts/docker-integration-tests/aggregator_legacy/test.sh index 5a116ececd..4171fd3b09 100755 --- a/scripts/docker-integration-tests/aggregator_legacy/test.sh +++ b/scripts/docker-integration-tests/aggregator_legacy/test.sh @@ -22,7 +22,7 @@ function defer { trap defer EXIT echo "Setup DB node" -setup_single_m3db_node +AGG_RESOLUTION=10s AGG_RETENTION=6h setup_single_m3db_node echo "Initializing aggregator topology" curl -vvvsSf -X POST -H "Cluster-Environment-Name: override_test_env" localhost:7201/api/v1/services/m3aggregator/placement/init -d '{ diff --git a/scripts/docker-integration-tests/carbon/m3coordinator.yml b/scripts/docker-integration-tests/carbon/m3coordinator.yml index a883cdbc0c..dcff5f3a08 100644 --- a/scripts/docker-integration-tests/carbon/m3coordinator.yml +++ b/scripts/docker-integration-tests/carbon/m3coordinator.yml @@ -1,13 +1,5 @@ clusters: - - namespaces: - - namespace: agg - type: aggregated - retention: 10h - resolution: 5s - - namespace: unagg - type: unaggregated - retention: 10m - client: + - client: config: service: env: default_env diff --git a/scripts/docker-integration-tests/carbon/test.sh b/scripts/docker-integration-tests/carbon/test.sh index d88c88a534..3094318a62 100755 --- a/scripts/docker-integration-tests/carbon/test.sh +++ b/scripts/docker-integration-tests/carbon/test.sh @@ -19,7 +19,7 @@ function defer { } trap defer EXIT -setup_single_m3db_node +AGG_RESOLUTION=5s setup_single_m3db_node function read_carbon { target=$1 diff --git a/scripts/docker-integration-tests/cold_writes_simple/m3coordinator.yml b/scripts/docker-integration-tests/cold_writes_simple/m3coordinator.yml index 37ff9ff20f..cc33cf4021 100644 --- a/scripts/docker-integration-tests/cold_writes_simple/m3coordinator.yml +++ b/scripts/docker-integration-tests/cold_writes_simple/m3coordinator.yml @@ -3,15 +3,7 @@ limits: maxFetchedSeries: 100 clusters: - - namespaces: - - namespace: agg - type: aggregated - retention: 10h - resolution: 15s - - namespace: unagg - type: unaggregated - retention: 10h - client: + - client: config: service: env: default_env diff --git a/scripts/docker-integration-tests/common.sh b/scripts/docker-integration-tests/common.sh index ab54fd4c3a..918ab3b76a 100644 --- a/scripts/docker-integration-tests/common.sh +++ b/scripts/docker-integration-tests/common.sh @@ -74,10 +74,32 @@ function setup_single_m3db_node_long_namespaces { ] }' + echo "Updating aggregation options for agg namespace" + curl -vvvsSf -X PUT 0.0.0.0:${coordinator_port}/api/v1/services/m3db/namespace -d '{ + "name": "agg", + "options": { + "aggregationOptions": { + "aggregations": [ + { + "aggregated": true, + "attributes": { + "resolutionDuration": "30s", + "downsampleOptions": { "all": false } + } + } + ] + } + } + }' + echo "Wait until placement is init'd" ATTEMPTS=10 MAX_TIMEOUT=4 TIMEOUT=1 retry_with_backoff \ '[ "$(curl -sSf 0.0.0.0:'"${coordinator_port}"'/api/v1/services/m3db/placement | jq .placement.instances.'${dbnode_id}'.id)" == \"'${dbnode_id}'\" ]' + echo "Wait until agg namespace is ready" + ATTEMPTS=20 MAX_TIMEOUT=4 TIMEOUT=1 retry_with_backoff \ + '[ "$(curl -sSf 0.0.0.0:'"${coordinator_port}"'/api/v1/services/m3db/namespace/ready -d "{ \"name\": \"agg\"}" | grep -c true)" -eq 1 ]' + wait_for_namespaces echo "Adding agg2d namespace" @@ -86,10 +108,31 @@ function setup_single_m3db_node_long_namespaces { "retentionTime": "48h" }' + echo "Updating aggregation options for agg namespace" + curl -vvvsSf -X PUT 0.0.0.0:${coordinator_port}/api/v1/services/m3db/namespace -d '{ + "name": "agg2d", + "options": { + "aggregationOptions": { + "aggregations": [ + { + "aggregated": true, + "attributes": { + "resolutionDuration": "1m", + "downsampleOptions": { "all": false } + } + } + ] + } + } + }' + echo "Wait until agg2d namespace is init'd" ATTEMPTS=10 MAX_TIMEOUT=4 TIMEOUT=1 retry_with_backoff \ '[ "$(curl -sSf 0.0.0.0:'"${coordinator_port}"'/api/v1/services/m3db/namespace | jq .registry.namespaces.agg2d.indexOptions.enabled)" == true ]' + echo "Wait until agg2d namespace is ready" + ATTEMPTS=20 MAX_TIMEOUT=4 TIMEOUT=1 retry_with_backoff \ + '[ "$(curl -sSf 0.0.0.0:'"${coordinator_port}"'/api/v1/services/m3db/namespace/ready -d "{ \"name\": \"agg2d\"}" | grep -c true)" -eq 1 ]' echo "Wait until bootstrapped" ATTEMPTS=100 MAX_TIMEOUT=4 TIMEOUT=1 retry_with_backoff \ @@ -103,6 +146,8 @@ function setup_single_m3db_node { local dbnode_id=${DBNODE_ID:-m3db_local} local coordinator_port=${COORDINATOR_PORT:-7201} local zone=${ZONE:-embedded} + local agg_resolution=${AGG_RESOLUTION:-15s} + local agg_retention=${AGG_RETENTION:-10h} echo "Wait for API to be available" ATTEMPTS=100 MAX_TIMEOUT=4 TIMEOUT=1 retry_with_backoff \ @@ -112,7 +157,7 @@ function setup_single_m3db_node { curl -vvvsSf -X POST 0.0.0.0:${coordinator_port}/api/v1/database/create -d '{ "type": "cluster", "namespaceName": "agg", - "retentionTime": "6h", + "retentionTime": "'${agg_retention}'", "num_shards": 4, "replicationFactor": 1, "hosts": [ @@ -127,10 +172,31 @@ function setup_single_m3db_node { ] }' + echo "Updating aggregation options for agg namespace" + curl -vvvsSf -X PUT 0.0.0.0:${coordinator_port}/api/v1/services/m3db/namespace -d '{ + "name": "agg", + "options": { + "aggregationOptions": { + "aggregations": [ + { + "aggregated": true, + "attributes": { + "resolutionDuration": "'${agg_resolution}'" + } + } + ] + } + } + }' + echo "Wait until placement is init'd" ATTEMPTS=10 MAX_TIMEOUT=4 TIMEOUT=1 retry_with_backoff \ '[ "$(curl -sSf 0.0.0.0:'"${coordinator_port}"'/api/v1/services/m3db/placement | jq .placement.instances.'${dbnode_id}'.id)" == \"'${dbnode_id}'\" ]' + echo "Wait until agg namespace is ready" + ATTEMPTS=20 MAX_TIMEOUT=4 TIMEOUT=1 retry_with_backoff \ + '[ "$(curl -sSf 0.0.0.0:'"${coordinator_port}"'/api/v1/services/m3db/namespace/ready -d "{ \"name\": \"agg\"}" | grep -c true)" -eq 1 ]' + wait_for_namespaces echo "Wait until bootstrapped" @@ -147,6 +213,8 @@ function setup_two_m3db_nodes { local dbnode_host_1_health_port=${DBNODE_HEALTH_PORT_01:-9012} local dbnode_host_2_health_port=${DBNODE_HEALTH_PORT_02:-9022} local coordinator_port=${COORDINATOR_PORT:-7201} + local agg_resolution=${AGG_RESOLUTION:-15s} + local agg_retention=${AGG_RETENTION:-10h} echo "Wait for API to be available" ATTEMPTS=100 MAX_TIMEOUT=4 TIMEOUT=1 retry_with_backoff \ @@ -156,7 +224,7 @@ function setup_two_m3db_nodes { curl -vvvsSf -X POST 0.0.0.0:${coordinator_port}/api/v1/database/create -d '{ "type": "cluster", "namespaceName": "agg", - "retentionTime": "6h", + "retentionTime": "'${agg_retention}'", "num_shards": 2, "replicationFactor": 2, "hosts": [ @@ -179,10 +247,31 @@ function setup_two_m3db_nodes { ] }' + echo "Updating aggregation options for agg namespace" + curl -vvvsSf -X PUT 0.0.0.0:${coordinator_port}/api/v1/services/m3db/namespace -d '{ + "name": "agg", + "options": { + "aggregationOptions": { + "aggregations": [ + { + "aggregated": true, + "attributes": { + "resolutionDuration": "'${agg_resolution}'" + } + } + ] + } + } + }' + echo "Wait until placement is init'd" ATTEMPTS=10 MAX_TIMEOUT=4 TIMEOUT=1 retry_with_backoff \ '[ "$(curl -sSf 0.0.0.0:'"${coordinator_port}"'/api/v1/services/m3db/placement | jq .placement.instances.'"${dbnode_id_1}"'.id)" == \"'"${dbnode_id_1}"'\" ]' + echo "Wait until agg namespace is ready" + ATTEMPTS=20 MAX_TIMEOUT=4 TIMEOUT=1 retry_with_backoff \ + '[ "$(curl -sSf 0.0.0.0:'"${coordinator_port}"'/api/v1/services/m3db/namespace/ready -d "{ \"name\": \"agg\"}" | grep -c true)" -eq 1 ]' + wait_for_namespaces echo "Wait until bootstrapped" @@ -194,6 +283,7 @@ function setup_two_m3db_nodes { function wait_for_namespaces { local coordinator_port=${COORDINATOR_PORT:-7201} + local unagg_retention=${UNAGG_RETENTION:-10h} echo "Wait until agg namespace is init'd" ATTEMPTS=10 MAX_TIMEOUT=4 TIMEOUT=1 retry_with_backoff \ @@ -202,13 +292,17 @@ function wait_for_namespaces { echo "Adding unagg namespace" curl -vvvsSf -X POST 0.0.0.0:${coordinator_port}/api/v1/database/namespace/create -d '{ "namespaceName": "unagg", - "retentionTime": "6h" + "retentionTime": "'${unagg_retention}'" }' echo "Wait until unagg namespace is init'd" ATTEMPTS=10 MAX_TIMEOUT=4 TIMEOUT=1 retry_with_backoff \ '[ "$(curl -sSf 0.0.0.0:'"${coordinator_port}"'/api/v1/services/m3db/namespace | jq .registry.namespaces.unagg.indexOptions.enabled)" == true ]' + echo "Wait until unagg namespace is ready" + ATTEMPTS=20 MAX_TIMEOUT=4 TIMEOUT=1 retry_with_backoff \ + '[ "$(curl -sSf 0.0.0.0:'"${coordinator_port}"'/api/v1/services/m3db/namespace/ready -d "{ \"name\": \"unagg\"}" | grep -c true)" -eq 1 ]' + echo "Adding coldWritesRepairAndNoIndex namespace" curl -vvvsSf -X POST 0.0.0.0:${coordinator_port}/api/v1/services/m3db/namespace -d '{ "name": "coldWritesRepairAndNoIndex", @@ -227,6 +321,11 @@ function wait_for_namespaces { "bufferPastDuration": "10m", "blockDataExpiry": true, "blockDataExpiryAfterNotAccessPeriodDuration": "5m" + }, + "aggregationOptions": { + "aggregations": [ + { "aggregated": false } + ] } } }' diff --git a/scripts/docker-integration-tests/coordinator_config_rules/m3coordinator.yml b/scripts/docker-integration-tests/coordinator_config_rules/m3coordinator.yml index 46a9f4cdc7..952c9a7ddf 100644 --- a/scripts/docker-integration-tests/coordinator_config_rules/m3coordinator.yml +++ b/scripts/docker-integration-tests/coordinator_config_rules/m3coordinator.yml @@ -1,21 +1,5 @@ clusters: - - namespaces: - - namespace: agg - type: aggregated - retention: 24h - resolution: 30s - downsample: - all: false - - namespace: agg2d - type: aggregated - retention: 48h - resolution: 1m - downsample: - all: false - - namespace: unagg - type: unaggregated - retention: 10m - client: + - client: config: service: env: default_env diff --git a/scripts/docker-integration-tests/multi_cluster_write/m3coordinator-cluster-a.yml b/scripts/docker-integration-tests/multi_cluster_write/m3coordinator-cluster-a.yml index 9435c4aaf6..e393f2ef22 100644 --- a/scripts/docker-integration-tests/multi_cluster_write/m3coordinator-cluster-a.yml +++ b/scripts/docker-integration-tests/multi_cluster_write/m3coordinator-cluster-a.yml @@ -3,15 +3,7 @@ limits: maxFetchedSeries: 100 clusters: - - namespaces: - - namespace: agg - type: aggregated - retention: 10h - resolution: 15s - - namespace: unagg - type: unaggregated - retention: 10h - client: + - client: config: service: env: default_env diff --git a/scripts/docker-integration-tests/multi_cluster_write/m3coordinator-cluster-b.yml b/scripts/docker-integration-tests/multi_cluster_write/m3coordinator-cluster-b.yml index 7f567e157d..7933a84078 100644 --- a/scripts/docker-integration-tests/multi_cluster_write/m3coordinator-cluster-b.yml +++ b/scripts/docker-integration-tests/multi_cluster_write/m3coordinator-cluster-b.yml @@ -3,15 +3,7 @@ limits: maxFetchedSeries: 100 clusters: - - namespaces: - - namespace: agg - type: aggregated - retention: 10h - resolution: 15s - - namespace: unagg - type: unaggregated - retention: 10h - client: + - client: config: service: env: default_env diff --git a/scripts/docker-integration-tests/prometheus/m3coordinator.yml b/scripts/docker-integration-tests/prometheus/m3coordinator.yml index 92edee33fc..cd1a97eb82 100644 --- a/scripts/docker-integration-tests/prometheus/m3coordinator.yml +++ b/scripts/docker-integration-tests/prometheus/m3coordinator.yml @@ -3,15 +3,7 @@ limits: maxFetchedSeries: 100 clusters: - - namespaces: - - namespace: agg - type: aggregated - retention: 10h - resolution: 15s - - namespace: unagg - type: unaggregated - retention: 10m - client: + - client: config: service: env: default_env diff --git a/scripts/docker-integration-tests/prometheus_replication/m3coordinator01.yml b/scripts/docker-integration-tests/prometheus_replication/m3coordinator01.yml index 690e81eac6..426653f6cc 100644 --- a/scripts/docker-integration-tests/prometheus_replication/m3coordinator01.yml +++ b/scripts/docker-integration-tests/prometheus_replication/m3coordinator01.yml @@ -4,15 +4,7 @@ writeForwarding: - url: http://coordinator02:7201/api/v1/prom/remote/write clusters: - - namespaces: - - namespace: agg - type: aggregated - retention: 10h - resolution: 15s - - namespace: unagg - type: unaggregated - retention: 10m - client: + - client: config: service: env: default_env diff --git a/scripts/docker-integration-tests/prometheus_replication/m3coordinator02.yml b/scripts/docker-integration-tests/prometheus_replication/m3coordinator02.yml index b593362683..d7a81d02b4 100644 --- a/scripts/docker-integration-tests/prometheus_replication/m3coordinator02.yml +++ b/scripts/docker-integration-tests/prometheus_replication/m3coordinator02.yml @@ -1,13 +1,5 @@ clusters: - - namespaces: - - namespace: agg - type: aggregated - retention: 10h - resolution: 15s - - namespace: unagg - type: unaggregated - retention: 10m - client: + - client: config: service: env: default_env diff --git a/scripts/docker-integration-tests/query_fanout/m3coordinator-cluster-a.yml b/scripts/docker-integration-tests/query_fanout/m3coordinator-cluster-a.yml index 85bf17a51d..d9703deed2 100644 --- a/scripts/docker-integration-tests/query_fanout/m3coordinator-cluster-a.yml +++ b/scripts/docker-integration-tests/query_fanout/m3coordinator-cluster-a.yml @@ -9,15 +9,7 @@ rpc: remoteListenAddresses: ["coordinator-cluster-c:7202"] clusters: - - namespaces: - - namespace: agg - type: aggregated - retention: 10h - resolution: 5s - - namespace: unagg - type: unaggregated - retention: 10m - client: + - client: config: service: env: default_env diff --git a/scripts/docker-integration-tests/query_fanout/m3coordinator-cluster-b.yml b/scripts/docker-integration-tests/query_fanout/m3coordinator-cluster-b.yml index 07be9bbc8e..7e27bb0a00 100644 --- a/scripts/docker-integration-tests/query_fanout/m3coordinator-cluster-b.yml +++ b/scripts/docker-integration-tests/query_fanout/m3coordinator-cluster-b.yml @@ -9,15 +9,7 @@ rpc: remoteListenAddresses: ["coordinator-cluster-c:7202"] clusters: - - namespaces: - - namespace: agg - type: aggregated - retention: 10h - resolution: 5s - - namespace: unagg - type: unaggregated - retention: 10m - client: + - client: config: service: env: default_env diff --git a/scripts/docker-integration-tests/query_fanout/m3coordinator-cluster-c.yml b/scripts/docker-integration-tests/query_fanout/m3coordinator-cluster-c.yml index 21e70608e7..2ec60faa05 100644 --- a/scripts/docker-integration-tests/query_fanout/m3coordinator-cluster-c.yml +++ b/scripts/docker-integration-tests/query_fanout/m3coordinator-cluster-c.yml @@ -9,15 +9,7 @@ rpc: remoteListenAddresses: ["coordinator-cluster-b:7202"] clusters: - - namespaces: - - namespace: agg - type: aggregated - retention: 10h - resolution: 5s - - namespace: unagg - type: unaggregated - retention: 10m - client: + - client: config: service: env: default_env diff --git a/scripts/docker-integration-tests/query_fanout/test.sh b/scripts/docker-integration-tests/query_fanout/test.sh index 5a4321e2dc..413ee4fc53 100755 --- a/scripts/docker-integration-tests/query_fanout/test.sh +++ b/scripts/docker-integration-tests/query_fanout/test.sh @@ -28,13 +28,13 @@ function defer { } trap defer EXIT -DBNODE_HOST=dbnode-cluster-a DBDNODE_PORT=9000 DBNODE_HEALTH_PORT=9002 COORDINATOR_PORT=7201 \ +AGG_RESOLUTION=5s DBNODE_HOST=dbnode-cluster-a DBDNODE_PORT=9000 DBNODE_HEALTH_PORT=9002 COORDINATOR_PORT=7201 \ setup_single_m3db_node -DBNODE_HOST=dbnode-cluster-b DBDNODE_PORT=19000 DBNODE_HEALTH_PORT=19002 COORDINATOR_PORT=17201 \ +AGG_RESOLUTION=5s DBNODE_HOST=dbnode-cluster-b DBDNODE_PORT=19000 DBNODE_HEALTH_PORT=19002 COORDINATOR_PORT=17201 \ setup_single_m3db_node -DBNODE_HOST=dbnode-cluster-c DBDNODE_PORT=29000 DBNODE_HEALTH_PORT=29002 COORDINATOR_PORT=27201 \ +AGG_RESOLUTION=5s DBNODE_HOST=dbnode-cluster-c DBDNODE_PORT=29000 DBNODE_HEALTH_PORT=29002 COORDINATOR_PORT=27201 \ setup_single_m3db_node echo "Write data to cluster a" diff --git a/scripts/docker-integration-tests/repair/m3coordinator.yml b/scripts/docker-integration-tests/repair/m3coordinator.yml index 37ff9ff20f..cc33cf4021 100644 --- a/scripts/docker-integration-tests/repair/m3coordinator.yml +++ b/scripts/docker-integration-tests/repair/m3coordinator.yml @@ -3,15 +3,7 @@ limits: maxFetchedSeries: 100 clusters: - - namespaces: - - namespace: agg - type: aggregated - retention: 10h - resolution: 15s - - namespace: unagg - type: unaggregated - retention: 10h - client: + - client: config: service: env: default_env diff --git a/scripts/docker-integration-tests/repair_and_replication/m3coordinator-cluster-a.yml b/scripts/docker-integration-tests/repair_and_replication/m3coordinator-cluster-a.yml index fdfefaeb3a..6695f7be3c 100644 --- a/scripts/docker-integration-tests/repair_and_replication/m3coordinator-cluster-a.yml +++ b/scripts/docker-integration-tests/repair_and_replication/m3coordinator-cluster-a.yml @@ -18,15 +18,7 @@ limits: maxFetchedSeries: 100 clusters: - - namespaces: - - namespace: agg - type: aggregated - retention: 10h - resolution: 15s - - namespace: unagg - type: unaggregated - retention: 10h - client: + - client: config: service: env: default_env diff --git a/scripts/docker-integration-tests/repair_and_replication/m3coordinator-cluster-b.yml b/scripts/docker-integration-tests/repair_and_replication/m3coordinator-cluster-b.yml index 7f567e157d..7933a84078 100644 --- a/scripts/docker-integration-tests/repair_and_replication/m3coordinator-cluster-b.yml +++ b/scripts/docker-integration-tests/repair_and_replication/m3coordinator-cluster-b.yml @@ -3,15 +3,7 @@ limits: maxFetchedSeries: 100 clusters: - - namespaces: - - namespace: agg - type: aggregated - retention: 10h - resolution: 15s - - namespace: unagg - type: unaggregated - retention: 10h - client: + - client: config: service: env: default_env diff --git a/scripts/docker-integration-tests/replication/m3coordinator-cluster-a.yml b/scripts/docker-integration-tests/replication/m3coordinator-cluster-a.yml index 9435c4aaf6..e393f2ef22 100644 --- a/scripts/docker-integration-tests/replication/m3coordinator-cluster-a.yml +++ b/scripts/docker-integration-tests/replication/m3coordinator-cluster-a.yml @@ -3,15 +3,7 @@ limits: maxFetchedSeries: 100 clusters: - - namespaces: - - namespace: agg - type: aggregated - retention: 10h - resolution: 15s - - namespace: unagg - type: unaggregated - retention: 10h - client: + - client: config: service: env: default_env diff --git a/scripts/docker-integration-tests/replication/m3coordinator-cluster-b.yml b/scripts/docker-integration-tests/replication/m3coordinator-cluster-b.yml index 7f567e157d..7933a84078 100644 --- a/scripts/docker-integration-tests/replication/m3coordinator-cluster-b.yml +++ b/scripts/docker-integration-tests/replication/m3coordinator-cluster-b.yml @@ -3,15 +3,7 @@ limits: maxFetchedSeries: 100 clusters: - - namespaces: - - namespace: agg - type: aggregated - retention: 10h - resolution: 15s - - namespace: unagg - type: unaggregated - retention: 10h - client: + - client: config: service: env: default_env diff --git a/scripts/docker-integration-tests/simple_v2_batch_apis/m3coordinator.yml b/scripts/docker-integration-tests/simple_v2_batch_apis/m3coordinator.yml index 83e9cf4df4..7d5cb12598 100644 --- a/scripts/docker-integration-tests/simple_v2_batch_apis/m3coordinator.yml +++ b/scripts/docker-integration-tests/simple_v2_batch_apis/m3coordinator.yml @@ -3,15 +3,7 @@ limits: maxFetchedSeries: 100 clusters: - - namespaces: - - namespace: agg - type: aggregated - retention: 10h - resolution: 15s - - namespace: unagg - type: unaggregated - retention: 10m - client: + - client: config: service: env: default_env diff --git a/src/cmd/services/m3coordinator/downsample/async_downsampler.go b/src/cmd/services/m3coordinator/downsample/async_downsampler.go index 9bb2f9f7f3..081c7ee884 100644 --- a/src/cmd/services/m3coordinator/downsample/async_downsampler.go +++ b/src/cmd/services/m3coordinator/downsample/async_downsampler.go @@ -92,3 +92,12 @@ func (d *asyncDownsampler) NewMetricsAppender() (MetricsAppender, error) { } return d.downsampler.NewMetricsAppender() } + +func (d *asyncDownsampler) Enabled() bool { + d.RLock() + defer d.RUnlock() + if d.err != nil { + return false + } + return d.downsampler.Enabled() +} diff --git a/src/cmd/services/m3coordinator/downsample/downsample_mock.go b/src/cmd/services/m3coordinator/downsample/downsample_mock.go index 2d04b1a4c8..5885b2d61a 100644 --- a/src/cmd/services/m3coordinator/downsample/downsample_mock.go +++ b/src/cmd/services/m3coordinator/downsample/downsample_mock.go @@ -54,6 +54,20 @@ func (m *MockDownsampler) EXPECT() *MockDownsamplerMockRecorder { return m.recorder } +// Enabled mocks base method +func (m *MockDownsampler) Enabled() bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Enabled") + ret0, _ := ret[0].(bool) + return ret0 +} + +// Enabled indicates an expected call of Enabled +func (mr *MockDownsamplerMockRecorder) Enabled() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Enabled", reflect.TypeOf((*MockDownsampler)(nil).Enabled)) +} + // NewMetricsAppender mocks base method func (m *MockDownsampler) NewMetricsAppender() (MetricsAppender, error) { m.ctrl.T.Helper() diff --git a/src/cmd/services/m3coordinator/downsample/downsampler.go b/src/cmd/services/m3coordinator/downsample/downsampler.go index 376d990e69..dd78b2372b 100644 --- a/src/cmd/services/m3coordinator/downsample/downsampler.go +++ b/src/cmd/services/m3coordinator/downsample/downsampler.go @@ -26,6 +26,7 @@ import ( "github.com/m3db/m3/src/metrics/generated/proto/metricpb" "github.com/m3db/m3/src/query/storage/m3" + "github.com/m3db/m3/src/query/storage/m3/storagemetadata" "github.com/m3db/m3/src/query/ts" "go.uber.org/zap" @@ -35,6 +36,10 @@ import ( // Downsampler is a downsampler. type Downsampler interface { NewMetricsAppender() (MetricsAppender, error) + // Enabled indicates whether the downsampler is enabled or not. A + // downsampler is enabled if there are aggregated ClusterNamespaces + // that exist as downsampling only applies to aggregations. + Enabled() bool } // MetricsAppender is a metrics appender that can build a samples @@ -88,6 +93,7 @@ type downsampler struct { sync.RWMutex metricsAppenderOpts metricsAppenderOptions + enabled bool } type downsamplerOptions struct { @@ -145,6 +151,13 @@ func (d *downsampler) NewMetricsAppender() (MetricsAppender, error) { return metricsAppender, nil } +func (d *downsampler) Enabled() bool { + d.RLock() + defer d.RUnlock() + + return d.enabled +} + func (d *downsampler) OnUpdate(namespaces m3.ClusterNamespaces) { logger := d.opts.InstrumentOptions.Logger() @@ -153,6 +166,15 @@ func (d *downsampler) OnUpdate(namespaces m3.ClusterNamespaces) { return } + var hasAggregatedNamespaces bool + for _, namespace := range namespaces { + attrs := namespace.Options().Attributes() + if attrs.MetricsType == storagemetadata.AggregatedMetricsType { + hasAggregatedNamespaces = true + break + } + } + autoMappingRules, err := NewAutoMappingRules(namespaces) if err != nil { logger.Error("could not generate automapping rules for aggregated namespaces."+ @@ -180,5 +202,7 @@ func (d *downsampler) OnUpdate(namespaces m3.ClusterNamespaces) { d.Lock() d.metricsAppenderOpts.defaultStagedMetadatasProtos = defaultStagedMetadatasProtos + // Can only downsample when aggregated namespaces are available. + d.enabled = hasAggregatedNamespaces d.Unlock() } diff --git a/src/cmd/services/m3coordinator/downsample/downsampler_test.go b/src/cmd/services/m3coordinator/downsample/downsampler_test.go index 38cf4a3cc9..bc850de531 100644 --- a/src/cmd/services/m3coordinator/downsample/downsampler_test.go +++ b/src/cmd/services/m3coordinator/downsample/downsampler_test.go @@ -102,6 +102,8 @@ func TestDownsamplerAggregationWithAutoMappingRulesFromNamespacesWatcher(t *test }, }) + require.False(t, testDownsampler.downsampler.Enabled()) + origStagedMetadata := originalStagedMetadata(t, testDownsampler) session := dbclient.NewMockSession(ctrl) @@ -114,10 +116,47 @@ func TestDownsamplerAggregationWithAutoMappingRulesFromNamespacesWatcher(t *test waitForStagedMetadataUpdate(t, testDownsampler, origStagedMetadata) + require.True(t, testDownsampler.downsampler.Enabled()) + // Test expected output testDownsamplerAggregation(t, testDownsampler) } +func TestDownsamplerAggregationToggleEnabled(t *testing.T) { + ctrl := xtest.NewController(t) + defer ctrl.Finish() + + testDownsampler := newTestDownsampler(t, testDownsamplerOptions{}) + + require.False(t, testDownsampler.downsampler.Enabled()) + + // Add an aggregated namespace and expect downsampler to be enabled. + session := dbclient.NewMockSession(ctrl) + setAggregatedNamespaces(t, testDownsampler, session, m3.AggregatedClusterNamespaceDefinition{ + NamespaceID: ident.StringID("2s:1d"), + Resolution: 2 * time.Second, + Retention: 24 * time.Hour, + Session: session, + }) + waitForEnabledUpdate(t, &testDownsampler, false) + + require.True(t, testDownsampler.downsampler.Enabled()) + + // Set just an unaggregated namespace and expect downsampler to be disabled. + clusters, err := m3.NewClusters(m3.UnaggregatedClusterNamespaceDefinition{ + NamespaceID: ident.StringID("default"), + Retention: 48 * time.Hour, + Session: session, + }) + require.NoError(t, err) + require.NoError(t, + testDownsampler.opts.ClusterNamespacesWatcher.Update(clusters.ClusterNamespaces())) + + waitForEnabledUpdate(t, &testDownsampler, true) + + require.False(t, testDownsampler.downsampler.Enabled()) +} + func TestDownsamplerAggregationWithRulesStore(t *testing.T) { testDownsampler := newTestDownsampler(t, testDownsamplerOptions{}) rulesStore := testDownsampler.rulesStore @@ -1253,7 +1292,18 @@ func waitForStagedMetadataUpdate(t *testing.T, testDownsampler testDownsampler, return !assert.ObjectsAreEqual(origStagedMetadata, ds.metricsAppenderOpts.defaultStagedMetadatasProtos) }, time.Second)) +} +func waitForEnabledUpdate(t *testing.T, testDownsampler *testDownsampler, current bool) { + ds, ok := testDownsampler.downsampler.(*downsampler) + require.True(t, ok) + + require.True(t, clock.WaitUntil(func() bool { + ds.RLock() + defer ds.RUnlock() + + return current != ds.enabled + }, time.Second)) } type testExpectedWrite struct { diff --git a/src/cmd/services/m3coordinator/ingest/write.go b/src/cmd/services/m3coordinator/ingest/write.go index 199f3325a1..27cd4b477a 100644 --- a/src/cmd/services/m3coordinator/ingest/write.go +++ b/src/cmd/services/m3coordinator/ingest/write.go @@ -200,17 +200,16 @@ func (d *downsamplerAndWriter) shouldDownsample( overrides WriteOptions, ) bool { var ( - downsamplerExists = d.downsampler != nil // If they didn't request the mapping rules to be overridden, then assume they want the default // ones. useDefaultMappingRules = !overrides.DownsampleOverride // If they did try and override the mapping rules, make sure they've provided at least one. _, downsampleOverride = d.downsampleOverrideRules(overrides) ) - // Only downsample if the downsampler exists, and they either want to use the default mapping + // Only downsample if the downsampler is enabled, and they either want to use the default mapping // rules, or they're trying to override the mapping rules and they've provided at least one // override to do so. - return downsamplerExists && (useDefaultMappingRules || downsampleOverride) + return d.downsampler.Enabled() && (useDefaultMappingRules || downsampleOverride) } func (d *downsamplerAndWriter) downsampleOverrideRules( diff --git a/src/cmd/services/m3coordinator/ingest/write_test.go b/src/cmd/services/m3coordinator/ingest/write_test.go index 243d8414dc..ae798c6825 100644 --- a/src/cmd/services/m3coordinator/ingest/write_test.go +++ b/src/cmd/services/m3coordinator/ingest/write_test.go @@ -437,9 +437,8 @@ func TestDownsampleAndWriteNoDownsampler(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - downAndWrite, _, session := newTestDownsamplerAndWriter(t, ctrl, + downAndWrite, _, session := newTestDownsamplerAndWriterWithEnabled(t, ctrl, false, testDownsamplerAndWriterOptions{}) - downAndWrite.downsampler = nil expectDefaultStorageWrites(session, testDatapoints1, testAnnotation1) @@ -653,9 +652,8 @@ func TestDownsampleAndWriteBatchNoDownsampler(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - downAndWrite, _, session := newTestDownsamplerAndWriter(t, ctrl, + downAndWrite, _, session := newTestDownsamplerAndWriterWithEnabled(t, ctrl, false, testDownsamplerAndWriterOptions{}) - downAndWrite.downsampler = nil for _, entry := range testEntries { for _, dp := range entry.datapoints { @@ -811,6 +809,15 @@ func newTestDownsamplerAndWriter( t *testing.T, ctrl *gomock.Controller, opts testDownsamplerAndWriterOptions, +) (*downsamplerAndWriter, *downsample.MockDownsampler, *client.MockSession) { + return newTestDownsamplerAndWriterWithEnabled(t, ctrl, true, opts) +} + +func newTestDownsamplerAndWriterWithEnabled( + t *testing.T, + ctrl *gomock.Controller, + enabled bool, + opts testDownsamplerAndWriterOptions, ) (*downsamplerAndWriter, *downsample.MockDownsampler, *client.MockSession) { var ( storage storage.Storage @@ -822,6 +829,7 @@ func newTestDownsamplerAndWriter( storage, session = testm3.NewStorageAndSession(t, ctrl) } downsampler := downsample.NewMockDownsampler(ctrl) + downsampler.EXPECT().Enabled().Return(enabled) return NewDownsamplerAndWriter(storage, downsampler, testWorkerPool, instrument.NewOptions()).(*downsamplerAndWriter), downsampler, session } @@ -833,6 +841,7 @@ func newTestDownsamplerAndWriterWithAggregatedNamespace( storage, session := testm3.NewStorageAndSessionWithAggregatedNamespaces( t, ctrl, aggregatedNamespaces) downsampler := downsample.NewMockDownsampler(ctrl) + downsampler.EXPECT().Enabled().Return(true) return NewDownsamplerAndWriter(storage, downsampler, testWorkerPool, instrument.NewOptions()).(*downsamplerAndWriter), downsampler, session } diff --git a/src/dbnode/config/m3dbnode-local-etcd-proto.yml b/src/dbnode/config/m3dbnode-local-etcd-proto.yml index 3b49b3574f..09836779be 100644 --- a/src/dbnode/config/m3dbnode-local-etcd-proto.yml +++ b/src/dbnode/config/m3dbnode-local-etcd-proto.yml @@ -1,9 +1,4 @@ -coordinator: - local: - namespaces: - - namespace: default - type: unaggregated - retention: 48h +coordinator: {} db: hostID: diff --git a/src/dbnode/config/m3dbnode-local-etcd.yml b/src/dbnode/config/m3dbnode-local-etcd.yml index 51019f02d4..5ebf51b66e 100644 --- a/src/dbnode/config/m3dbnode-local-etcd.yml +++ b/src/dbnode/config/m3dbnode-local-etcd.yml @@ -1,9 +1,4 @@ -coordinator: - local: - namespaces: - - namespace: default - type: unaggregated - retention: 48h +coordinator: {} db: hostID: diff --git a/src/query/server/query.go b/src/query/server/query.go index 0d44c0bbc4..e30dd1ded0 100644 --- a/src/query/server/query.go +++ b/src/query/server/query.go @@ -640,45 +640,40 @@ func newM3DBStorage( namespaces = clusters.ClusterNamespaces() downsampler downsample.Downsampler ) - if n := namespaces.NumAggregatedClusterNamespaces(); n > 0 { - logger.Info("configuring downsampler to use with aggregated cluster namespaces", - zap.Int("numAggregatedClusterNamespaces", n)) + logger.Info("configuring downsampler to use with aggregated cluster namespaces", + zap.Int("numAggregatedClusterNamespaces", len(namespaces))) + + newDownsamplerFn := func() (downsample.Downsampler, error) { + ds, err := newDownsampler( + cfg.Downsample, clusterClient, + fanoutStorage, clusterNamespacesWatcher, + tsdbOpts.TagOptions(), instrumentOptions, rwOpts) if err != nil { - return nil, nil, nil, nil, err + return nil, err } - newDownsamplerFn := func() (downsample.Downsampler, error) { - downsampler, err := newDownsampler( - cfg.Downsample, clusterClient, - fanoutStorage, clusterNamespacesWatcher, - tsdbOpts.TagOptions(), instrumentOptions, rwOpts) - if err != nil { - return nil, err - } - - // Notify the downsampler ready channel that - // the downsampler has now been created and is ready. - if downsamplerReadyCh != nil { - downsamplerReadyCh <- struct{}{} - } - - return downsampler, nil + // Notify the downsampler ready channel that + // the downsampler has now been created and is ready. + if downsamplerReadyCh != nil { + downsamplerReadyCh <- struct{}{} } - if clusterClientWaitCh != nil { - // Need to wait before constructing and instead return an async downsampler - // since the cluster client will return errors until it's initialized itself - // and will fail constructing the downsampler consequently - downsampler = downsample.NewAsyncDownsampler(func() (downsample.Downsampler, error) { - <-clusterClientWaitCh - return newDownsamplerFn() - }, nil) - } else { - // Otherwise we already have a client and can immediately construct the downsampler - downsampler, err = newDownsamplerFn() - if err != nil { - return nil, nil, nil, nil, err - } + return ds, nil + } + + if clusterClientWaitCh != nil { + // Need to wait before constructing and instead return an async downsampler + // since the cluster client will return errors until it's initialized itself + // and will fail constructing the downsampler consequently + downsampler = downsample.NewAsyncDownsampler(func() (downsample.Downsampler, error) { + <-clusterClientWaitCh + return newDownsamplerFn() + }, nil) + } else { + // Otherwise we already have a client and can immediately construct the downsampler + downsampler, err = newDownsamplerFn() + if err != nil { + return nil, nil, nil, nil, err } }