diff --git a/scripts/docker-integration-tests/aggregator/m3aggregator.yml b/scripts/docker-integration-tests/aggregator/m3aggregator.yml index 49a13a770b..58d0370d4e 100644 --- a/scripts/docker-integration-tests/aggregator/m3aggregator.yml +++ b/scripts/docker-integration-tests/aggregator/m3aggregator.yml @@ -71,7 +71,7 @@ rawtcp: kvClient: etcd: - env: default_env + env: override_test_env zone: embedded service: m3aggregator cacheDir: /var/lib/m3kv @@ -82,7 +82,7 @@ kvClient: runtimeOptions: kvConfig: - environment: default_env + environment: override_test_env zone: embedded writeValuesPerMetricLimitPerSecondKey: write-values-per-metric-limit-per-second writeValuesPerMetricLimitPerSecond: 0 @@ -131,7 +131,7 @@ aggregator: placementKV: namespace: /placement zone: embedded - environment: default_env + environment: override_test_env placementWatcher: key: m3aggregator initWatchTimeout: 15s @@ -164,7 +164,7 @@ aggregator: placementManager: kvConfig: namespace: /placement - environment: default_env + environment: override_test_env zone: embedded placementWatcher: key: m3aggregator @@ -175,7 +175,7 @@ aggregator: resignTimeout: 1m flushTimesManager: kvConfig: - environment: default_env + environment: override_test_env zone: embedded flushTimesKeyFmt: shardset/%d/flush flushTimesPersistRetrier: @@ -190,7 +190,7 @@ aggregator: ttlSeconds: 10 serviceID: name: m3aggregator - environment: default_env + environment: override_test_env zone: embedded electionKeyFmt: shardset/%d/lock campaignRetrier: @@ -243,7 +243,7 @@ aggregator: topicName: aggregated_metrics topicServiceOverride: zone: embedded - environment: default_env + environment: override_test_env messageRetry: initialBackoff: 1m maxBackoff: 2m diff --git a/scripts/docker-integration-tests/aggregator/m3coordinator.yml b/scripts/docker-integration-tests/aggregator/m3coordinator.yml index 2b1c9b68b7..475afe191f 100644 --- a/scripts/docker-integration-tests/aggregator/m3coordinator.yml +++ b/scripts/docker-integration-tests/aggregator/m3coordinator.yml @@ -56,6 +56,7 @@ downsample: client: placementKV: namespace: /placement + environment: override_test_env placementWatcher: key: m3aggregator initWatchTimeout: 10s diff --git a/scripts/docker-integration-tests/aggregator/test.sh b/scripts/docker-integration-tests/aggregator/test.sh index 32ce73f9e2..e0b8d7ce13 100755 --- a/scripts/docker-integration-tests/aggregator/test.sh +++ b/scripts/docker-integration-tests/aggregator/test.sh @@ -25,7 +25,7 @@ echo "Setup DB node" setup_single_m3db_node echo "Initializing aggregator topology" -curl -vvvsSf -X POST localhost:7201/api/v1/services/m3aggregator/placement/init -d '{ +curl -vvvsSf -X POST -H "Cluster-Environment-Name: override_test_env" localhost:7201/api/v1/services/m3aggregator/placement/init -d '{ "num_shards": 64, "replication_factor": 2, "instances": [ @@ -51,7 +51,7 @@ curl -vvvsSf -X POST localhost:7201/api/v1/services/m3aggregator/placement/init }' echo "Initializing m3msg topic for m3coordinator ingestion from m3aggregators" -curl -vvvsSf -X POST localhost:7201/api/v1/topic/init -d '{ +curl -vvvsSf -X POST -H "Cluster-Environment-Name: override_test_env" localhost:7201/api/v1/topic/init -d '{ "numberOfShards": 64 }' @@ -75,7 +75,7 @@ echo "Done validating topology" # Do this after placement for m3coordinator is created. echo "Adding m3coordinator as a consumer to the aggregator topic" -curl -vvvsSf -X POST localhost:7201/api/v1/topic -d '{ +curl -vvvsSf -X POST -H "Cluster-Environment-Name: override_test_env" localhost:7201/api/v1/topic -d '{ "consumerService": { "serviceId": { "name": "m3coordinator", diff --git a/src/query/api/v1/handler/topic/add.go b/src/query/api/v1/handler/topic/add.go index a24d4bef30..9f37115b9a 100644 --- a/src/query/api/v1/handler/topic/add.go +++ b/src/query/api/v1/handler/topic/add.go @@ -27,6 +27,7 @@ import ( "github.com/m3db/m3/src/cmd/services/m3query/config" "github.com/m3db/m3/src/msg/topic" "github.com/m3db/m3/src/query/api/v1/handler" + "github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions" "github.com/m3db/m3/src/query/generated/proto/admin" "github.com/m3db/m3/src/query/util/logging" "github.com/m3db/m3/src/x/instrument" @@ -73,7 +74,9 @@ func (h *AddHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - service, err := h.serviceFn(h.client) + serviceCfg := handleroptions.ServiceNameAndDefaults{} + svcOpts := handleroptions.NewServiceOptions(serviceCfg, r.Header, nil) + service, err := h.serviceFn(h.client, svcOpts) if err != nil { logger.Error("unable to get service", zap.Error(err)) xhttp.Error(w, err, http.StatusInternalServerError) diff --git a/src/query/api/v1/handler/topic/common.go b/src/query/api/v1/handler/topic/common.go index 6154085ff8..1ae2a1bcc6 100644 --- a/src/query/api/v1/handler/topic/common.go +++ b/src/query/api/v1/handler/topic/common.go @@ -25,8 +25,10 @@ import ( "strings" clusterclient "github.com/m3db/m3/src/cluster/client" + "github.com/m3db/m3/src/cluster/kv" "github.com/m3db/m3/src/cmd/services/m3query/config" "github.com/m3db/m3/src/msg/topic" + "github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions" "github.com/m3db/m3/src/query/util/logging" "github.com/m3db/m3/src/x/instrument" xhttp "github.com/m3db/m3/src/x/net/http" @@ -43,7 +45,7 @@ const ( HeaderTopicName = "topic-name" ) -type serviceFn func(clusterClient clusterclient.Client) (topic.Service, error) +type serviceFn func(clusterClient clusterclient.Client, opts handleroptions.ServiceOptions) (topic.Service, error) // Handler represents a generic handler for topic endpoints. // nolint: structcheck @@ -57,11 +59,14 @@ type Handler struct { } // Service gets a topic service from m3cluster client -func Service(clusterClient clusterclient.Client) (topic.Service, error) { - return topic.NewService( - topic.NewServiceOptions(). - SetConfigService(clusterClient), - ) +func Service(clusterClient clusterclient.Client, opts handleroptions.ServiceOptions) (topic.Service, error) { + kvOverride := kv.NewOverrideOptions(). + SetEnvironment(opts.ServiceEnvironment). + SetZone(opts.ServiceZone) + topicOpts := topic.NewServiceOptions(). + SetConfigService(clusterClient). + SetKVOverrideOptions(kvOverride) + return topic.NewService(topicOpts) } // RegisterRoutes registers the topic routes diff --git a/src/query/api/v1/handler/topic/common_test.go b/src/query/api/v1/handler/topic/common_test.go index 62b4580276..77ba308bb2 100644 --- a/src/query/api/v1/handler/topic/common_test.go +++ b/src/query/api/v1/handler/topic/common_test.go @@ -26,6 +26,7 @@ import ( clusterclient "github.com/m3db/m3/src/cluster/client" "github.com/m3db/m3/src/msg/generated/proto/topicpb" "github.com/m3db/m3/src/msg/topic" + "github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions" "github.com/gogo/protobuf/jsonpb" "github.com/golang/mock/gomock" @@ -46,7 +47,7 @@ func validateEqualTopicProto(t *testing.T, this, other topicpb.Topic) { } func testServiceFn(s topic.Service) serviceFn { - return func(clusterClient clusterclient.Client) (topic.Service, error) { + return func(clusterClient clusterclient.Client, opts handleroptions.ServiceOptions) (topic.Service, error) { return s, nil } } diff --git a/src/query/api/v1/handler/topic/delete.go b/src/query/api/v1/handler/topic/delete.go index c7056f7abb..6af4cc1c10 100644 --- a/src/query/api/v1/handler/topic/delete.go +++ b/src/query/api/v1/handler/topic/delete.go @@ -26,6 +26,7 @@ import ( clusterclient "github.com/m3db/m3/src/cluster/client" "github.com/m3db/m3/src/cmd/services/m3query/config" "github.com/m3db/m3/src/query/api/v1/handler" + "github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions" "github.com/m3db/m3/src/query/util/logging" "github.com/m3db/m3/src/x/instrument" xhttp "github.com/m3db/m3/src/x/net/http" @@ -65,7 +66,9 @@ func (h *DeleteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { logger = logging.WithContext(ctx, h.instrumentOpts) ) - service, err := h.serviceFn(h.client) + serviceCfg := handleroptions.ServiceNameAndDefaults{} + svcOpts := handleroptions.NewServiceOptions(serviceCfg, r.Header, nil) + service, err := h.serviceFn(h.client, svcOpts) if err != nil { logger.Error("unable to get service", zap.Error(err)) xhttp.Error(w, err, http.StatusInternalServerError) diff --git a/src/query/api/v1/handler/topic/get.go b/src/query/api/v1/handler/topic/get.go index 095a9a11c0..b6b8d06972 100644 --- a/src/query/api/v1/handler/topic/get.go +++ b/src/query/api/v1/handler/topic/get.go @@ -27,6 +27,7 @@ import ( "github.com/m3db/m3/src/cmd/services/m3query/config" "github.com/m3db/m3/src/msg/topic" "github.com/m3db/m3/src/query/api/v1/handler" + "github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions" "github.com/m3db/m3/src/query/generated/proto/admin" "github.com/m3db/m3/src/query/util/logging" "github.com/m3db/m3/src/x/instrument" @@ -66,7 +67,9 @@ func (h *GetHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { logger = logging.WithContext(ctx, h.instrumentOpts) ) - service, err := h.serviceFn(h.client) + serviceCfg := handleroptions.ServiceNameAndDefaults{} + svcOpts := handleroptions.NewServiceOptions(serviceCfg, r.Header, nil) + service, err := h.serviceFn(h.client, svcOpts) if err != nil { logger.Error("unable to get service", zap.Error(err)) xhttp.Error(w, err, http.StatusInternalServerError) diff --git a/src/query/api/v1/handler/topic/init.go b/src/query/api/v1/handler/topic/init.go index c041c28c10..41e186fb25 100644 --- a/src/query/api/v1/handler/topic/init.go +++ b/src/query/api/v1/handler/topic/init.go @@ -27,6 +27,7 @@ import ( "github.com/m3db/m3/src/cmd/services/m3query/config" "github.com/m3db/m3/src/msg/topic" "github.com/m3db/m3/src/query/api/v1/handler" + "github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions" "github.com/m3db/m3/src/query/generated/proto/admin" "github.com/m3db/m3/src/query/util/logging" "github.com/m3db/m3/src/x/instrument" @@ -73,7 +74,9 @@ func (h *InitHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - service, err := h.serviceFn(h.client) + serviceCfg := handleroptions.ServiceNameAndDefaults{} + svcOpts := handleroptions.NewServiceOptions(serviceCfg, r.Header, nil) + service, err := h.serviceFn(h.client, svcOpts) if err != nil { logger.Error("unable to get service", zap.Error(err)) xhttp.Error(w, err, http.StatusInternalServerError)