Skip to content

Commit

Permalink
[query] api: Respect env+zone headers in topic (#2159)
Browse files Browse the repository at this point in the history
  • Loading branch information
schallert authored Feb 21, 2020
1 parent 7fdea6b commit 4cd86c7
Show file tree
Hide file tree
Showing 9 changed files with 40 additions and 21 deletions.
14 changes: 7 additions & 7 deletions scripts/docker-integration-tests/aggregator/m3aggregator.yml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ rawtcp:

kvClient:
etcd:
env: default_env
env: override_test_env
zone: embedded
service: m3aggregator
cacheDir: /var/lib/m3kv
Expand All @@ -82,7 +82,7 @@ kvClient:

runtimeOptions:
kvConfig:
environment: default_env
environment: override_test_env
zone: embedded
writeValuesPerMetricLimitPerSecondKey: write-values-per-metric-limit-per-second
writeValuesPerMetricLimitPerSecond: 0
Expand Down Expand Up @@ -131,7 +131,7 @@ aggregator:
placementKV:
namespace: /placement
zone: embedded
environment: default_env
environment: override_test_env
placementWatcher:
key: m3aggregator
initWatchTimeout: 15s
Expand Down Expand Up @@ -164,7 +164,7 @@ aggregator:
placementManager:
kvConfig:
namespace: /placement
environment: default_env
environment: override_test_env
zone: embedded
placementWatcher:
key: m3aggregator
Expand All @@ -175,7 +175,7 @@ aggregator:
resignTimeout: 1m
flushTimesManager:
kvConfig:
environment: default_env
environment: override_test_env
zone: embedded
flushTimesKeyFmt: shardset/%d/flush
flushTimesPersistRetrier:
Expand All @@ -190,7 +190,7 @@ aggregator:
ttlSeconds: 10
serviceID:
name: m3aggregator
environment: default_env
environment: override_test_env
zone: embedded
electionKeyFmt: shardset/%d/lock
campaignRetrier:
Expand Down Expand Up @@ -243,7 +243,7 @@ aggregator:
topicName: aggregated_metrics
topicServiceOverride:
zone: embedded
environment: default_env
environment: override_test_env
messageRetry:
initialBackoff: 1m
maxBackoff: 2m
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ downsample:
client:
placementKV:
namespace: /placement
environment: override_test_env
placementWatcher:
key: m3aggregator
initWatchTimeout: 10s
Expand Down
6 changes: 3 additions & 3 deletions scripts/docker-integration-tests/aggregator/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ echo "Setup DB node"
setup_single_m3db_node

echo "Initializing aggregator topology"
curl -vvvsSf -X POST localhost:7201/api/v1/services/m3aggregator/placement/init -d '{
curl -vvvsSf -X POST -H "Cluster-Environment-Name: override_test_env" localhost:7201/api/v1/services/m3aggregator/placement/init -d '{
"num_shards": 64,
"replication_factor": 2,
"instances": [
Expand All @@ -51,7 +51,7 @@ curl -vvvsSf -X POST localhost:7201/api/v1/services/m3aggregator/placement/init
}'

echo "Initializing m3msg topic for m3coordinator ingestion from m3aggregators"
curl -vvvsSf -X POST localhost:7201/api/v1/topic/init -d '{
curl -vvvsSf -X POST -H "Cluster-Environment-Name: override_test_env" localhost:7201/api/v1/topic/init -d '{
"numberOfShards": 64
}'

Expand All @@ -75,7 +75,7 @@ echo "Done validating topology"

# Do this after placement for m3coordinator is created.
echo "Adding m3coordinator as a consumer to the aggregator topic"
curl -vvvsSf -X POST localhost:7201/api/v1/topic -d '{
curl -vvvsSf -X POST -H "Cluster-Environment-Name: override_test_env" localhost:7201/api/v1/topic -d '{
"consumerService": {
"serviceId": {
"name": "m3coordinator",
Expand Down
5 changes: 4 additions & 1 deletion src/query/api/v1/handler/topic/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/m3db/m3/src/cmd/services/m3query/config"
"github.com/m3db/m3/src/msg/topic"
"github.com/m3db/m3/src/query/api/v1/handler"
"github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions"
"github.com/m3db/m3/src/query/generated/proto/admin"
"github.com/m3db/m3/src/query/util/logging"
"github.com/m3db/m3/src/x/instrument"
Expand Down Expand Up @@ -73,7 +74,9 @@ func (h *AddHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}

service, err := h.serviceFn(h.client)
serviceCfg := handleroptions.ServiceNameAndDefaults{}
svcOpts := handleroptions.NewServiceOptions(serviceCfg, r.Header, nil)
service, err := h.serviceFn(h.client, svcOpts)
if err != nil {
logger.Error("unable to get service", zap.Error(err))
xhttp.Error(w, err, http.StatusInternalServerError)
Expand Down
17 changes: 11 additions & 6 deletions src/query/api/v1/handler/topic/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ import (
"strings"

clusterclient "github.com/m3db/m3/src/cluster/client"
"github.com/m3db/m3/src/cluster/kv"
"github.com/m3db/m3/src/cmd/services/m3query/config"
"github.com/m3db/m3/src/msg/topic"
"github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions"
"github.com/m3db/m3/src/query/util/logging"
"github.com/m3db/m3/src/x/instrument"
xhttp "github.com/m3db/m3/src/x/net/http"
Expand All @@ -43,7 +45,7 @@ const (
HeaderTopicName = "topic-name"
)

type serviceFn func(clusterClient clusterclient.Client) (topic.Service, error)
type serviceFn func(clusterClient clusterclient.Client, opts handleroptions.ServiceOptions) (topic.Service, error)

// Handler represents a generic handler for topic endpoints.
// nolint: structcheck
Expand All @@ -57,11 +59,14 @@ type Handler struct {
}

// Service gets a topic service from m3cluster client
func Service(clusterClient clusterclient.Client) (topic.Service, error) {
return topic.NewService(
topic.NewServiceOptions().
SetConfigService(clusterClient),
)
func Service(clusterClient clusterclient.Client, opts handleroptions.ServiceOptions) (topic.Service, error) {
kvOverride := kv.NewOverrideOptions().
SetEnvironment(opts.ServiceEnvironment).
SetZone(opts.ServiceZone)
topicOpts := topic.NewServiceOptions().
SetConfigService(clusterClient).
SetKVOverrideOptions(kvOverride)
return topic.NewService(topicOpts)
}

// RegisterRoutes registers the topic routes
Expand Down
3 changes: 2 additions & 1 deletion src/query/api/v1/handler/topic/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
clusterclient "github.com/m3db/m3/src/cluster/client"
"github.com/m3db/m3/src/msg/generated/proto/topicpb"
"github.com/m3db/m3/src/msg/topic"
"github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions"

"github.com/gogo/protobuf/jsonpb"
"github.com/golang/mock/gomock"
Expand All @@ -46,7 +47,7 @@ func validateEqualTopicProto(t *testing.T, this, other topicpb.Topic) {
}

func testServiceFn(s topic.Service) serviceFn {
return func(clusterClient clusterclient.Client) (topic.Service, error) {
return func(clusterClient clusterclient.Client, opts handleroptions.ServiceOptions) (topic.Service, error) {
return s, nil
}
}
Expand Down
5 changes: 4 additions & 1 deletion src/query/api/v1/handler/topic/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
clusterclient "github.com/m3db/m3/src/cluster/client"
"github.com/m3db/m3/src/cmd/services/m3query/config"
"github.com/m3db/m3/src/query/api/v1/handler"
"github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions"
"github.com/m3db/m3/src/query/util/logging"
"github.com/m3db/m3/src/x/instrument"
xhttp "github.com/m3db/m3/src/x/net/http"
Expand Down Expand Up @@ -65,7 +66,9 @@ func (h *DeleteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
logger = logging.WithContext(ctx, h.instrumentOpts)
)

service, err := h.serviceFn(h.client)
serviceCfg := handleroptions.ServiceNameAndDefaults{}
svcOpts := handleroptions.NewServiceOptions(serviceCfg, r.Header, nil)
service, err := h.serviceFn(h.client, svcOpts)
if err != nil {
logger.Error("unable to get service", zap.Error(err))
xhttp.Error(w, err, http.StatusInternalServerError)
Expand Down
5 changes: 4 additions & 1 deletion src/query/api/v1/handler/topic/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/m3db/m3/src/cmd/services/m3query/config"
"github.com/m3db/m3/src/msg/topic"
"github.com/m3db/m3/src/query/api/v1/handler"
"github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions"
"github.com/m3db/m3/src/query/generated/proto/admin"
"github.com/m3db/m3/src/query/util/logging"
"github.com/m3db/m3/src/x/instrument"
Expand Down Expand Up @@ -66,7 +67,9 @@ func (h *GetHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
logger = logging.WithContext(ctx, h.instrumentOpts)
)

service, err := h.serviceFn(h.client)
serviceCfg := handleroptions.ServiceNameAndDefaults{}
svcOpts := handleroptions.NewServiceOptions(serviceCfg, r.Header, nil)
service, err := h.serviceFn(h.client, svcOpts)
if err != nil {
logger.Error("unable to get service", zap.Error(err))
xhttp.Error(w, err, http.StatusInternalServerError)
Expand Down
5 changes: 4 additions & 1 deletion src/query/api/v1/handler/topic/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/m3db/m3/src/cmd/services/m3query/config"
"github.com/m3db/m3/src/msg/topic"
"github.com/m3db/m3/src/query/api/v1/handler"
"github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions"
"github.com/m3db/m3/src/query/generated/proto/admin"
"github.com/m3db/m3/src/query/util/logging"
"github.com/m3db/m3/src/x/instrument"
Expand Down Expand Up @@ -73,7 +74,9 @@ func (h *InitHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}

service, err := h.serviceFn(h.client)
serviceCfg := handleroptions.ServiceNameAndDefaults{}
svcOpts := handleroptions.NewServiceOptions(serviceCfg, r.Header, nil)
service, err := h.serviceFn(h.client, svcOpts)
if err != nil {
logger.Error("unable to get service", zap.Error(err))
xhttp.Error(w, err, http.StatusInternalServerError)
Expand Down

0 comments on commit 4cd86c7

Please sign in to comment.