From 8b1fee8d66f6432cb90bd4e99928d978ac2519dc Mon Sep 17 00:00:00 2001 From: Matt Schallert Date: Sun, 3 Mar 2019 21:31:39 -0500 Subject: [PATCH 1/3] [api] support zone/env overrides in ns handler The placement APIs support zone/environment overrides via HTTP headers. This adds that functionality to the namespace APIs so users can manage multiple M3DB clusters from a single coordinator. --- src/query/api/v1/handler/database/create.go | 9 +- .../api/v1/handler/database/create_test.go | 8 + src/query/api/v1/handler/headers.go | 14 ++ src/query/api/v1/handler/namespace/add.go | 12 +- .../api/v1/handler/namespace/add_test.go | 2 + src/query/api/v1/handler/options.go | 135 ++++++++++++++++ src/query/api/v1/handler/options_test.go | 98 ++++++++++++ src/query/api/v1/handler/placement/add.go | 4 +- .../api/v1/handler/placement/add_test.go | 23 +-- src/query/api/v1/handler/placement/common.go | 151 ++++-------------- .../api/v1/handler/placement/common_test.go | 17 +- src/query/api/v1/handler/placement/delete.go | 4 +- .../api/v1/handler/placement/delete_all.go | 4 +- .../api/v1/handler/placement/delete_test.go | 21 +-- src/query/api/v1/handler/placement/get.go | 2 +- src/query/api/v1/handler/placement/init.go | 4 +- .../api/v1/handler/placement/init_test.go | 11 +- src/query/api/v1/handler/placement/replace.go | 6 +- .../api/v1/handler/placement/replace_test.go | 17 +- src/query/api/v1/httpd/handler.go | 6 +- src/x/serialize/serialize_mock.go | 2 +- 21 files changed, 361 insertions(+), 189 deletions(-) create mode 100644 src/query/api/v1/handler/options.go create mode 100644 src/query/api/v1/handler/options_test.go diff --git a/src/query/api/v1/handler/database/create.go b/src/query/api/v1/handler/database/create.go index 5587d38082..b1b9a11033 100644 --- a/src/query/api/v1/handler/database/create.go +++ b/src/query/api/v1/handler/database/create.go @@ -41,7 +41,7 @@ import ( "github.com/m3db/m3/src/query/api/v1/handler/placement" "github.com/m3db/m3/src/query/generated/proto/admin" "github.com/m3db/m3/src/query/util/logging" - "github.com/m3db/m3/src/x/net/http" + xhttp "github.com/m3db/m3/src/x/net/http" "github.com/golang/protobuf/jsonpb" "go.uber.org/zap" @@ -162,7 +162,7 @@ func (h *createHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { logger = logging.WithContext(ctx) ) - currPlacement, _, err := h.placementGetHandler.Get(placement.M3DBServiceName, nil) + currPlacement, _, err := h.placementGetHandler.Get(handler.M3DBServiceName, nil) if err != nil { logger.Error("unable to get placement", zap.Error(err)) xhttp.Error(w, err, http.StatusInternalServerError) @@ -205,7 +205,8 @@ func (h *createHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - nsRegistry, err = h.namespaceAddHandler.Add(namespaceRequest) + opts := handler.NewServiceOptions("kv", r.Header, nil) + nsRegistry, err = h.namespaceAddHandler.Add(namespaceRequest, opts) if err != nil { logger.Error("unable to add namespace", zap.Error(err)) xhttp.Error(w, err, http.StatusInternalServerError) @@ -241,7 +242,7 @@ func (h *createHandler) maybeInitPlacement( // If we're here then there is no existing placement, so just create it. This is safe because in // the case where a placement did not already exist, the parse function above validated that we // have all the required information to create a placement. - newPlacement, err := h.placementInitHandler.Init(placement.M3DBServiceName, r, placementRequest) + newPlacement, err := h.placementInitHandler.Init(handler.M3DBServiceName, r, placementRequest) if err != nil { return nil, false, err } diff --git a/src/query/api/v1/handler/database/create_test.go b/src/query/api/v1/handler/database/create_test.go index c5afa72dbc..7164f06e92 100644 --- a/src/query/api/v1/handler/database/create_test.go +++ b/src/query/api/v1/handler/database/create_test.go @@ -92,6 +92,7 @@ func testLocalType(t *testing.T, providedType string, placementExists bool) { defer ctrl.Finish() mockClient, mockKV, mockPlacementService := SetupDatabaseTest(t, ctrl) + mockClient.EXPECT().Store(gomock.Any()).Return(mockKV, nil).AnyTimes() createHandler := NewCreateHandler(mockClient, config.Configuration{}, testDBCfg) w := httptest.NewRecorder() @@ -245,6 +246,7 @@ func TestLocalTypeWithNumShards(t *testing.T) { defer ctrl.Finish() mockClient, mockKV, mockPlacementService := SetupDatabaseTest(t, ctrl) + mockClient.EXPECT().Store(gomock.Any()).Return(mockKV, nil).AnyTimes() createHandler := NewCreateHandler(mockClient, config.Configuration{}, testDBCfg) w := httptest.NewRecorder() @@ -349,6 +351,7 @@ func TestLocalWithBlockSizeNanos(t *testing.T) { defer ctrl.Finish() mockClient, mockKV, mockPlacementService := SetupDatabaseTest(t, ctrl) + mockClient.EXPECT().Store(gomock.Any()).Return(mockKV, nil).AnyTimes() createHandler := NewCreateHandler(mockClient, config.Configuration{}, testDBCfg) w := httptest.NewRecorder() @@ -454,6 +457,7 @@ func TestLocalWithBlockSizeExpectedSeriesDatapointsPerHour(t *testing.T) { defer ctrl.Finish() mockClient, mockKV, mockPlacementService := SetupDatabaseTest(t, ctrl) + mockClient.EXPECT().Store(gomock.Any()).Return(mockKV, nil).AnyTimes() createHandler := NewCreateHandler(mockClient, config.Configuration{}, testDBCfg) w := httptest.NewRecorder() @@ -571,6 +575,7 @@ func TestClusterTypeHostsPlacementAlreadyExistsHostsProvided(t *testing.T) { defer ctrl.Finish() mockClient, _, mockPlacementService := SetupDatabaseTest(t, ctrl) + mockClient.EXPECT().Store(gomock.Any()).Return(nil, nil).AnyTimes() createHandler := NewCreateHandler(mockClient, config.Configuration{}, testDBCfg) w := httptest.NewRecorder() @@ -669,6 +674,7 @@ func testClusterTypeHosts(t *testing.T, placementExists bool) { defer ctrl.Finish() mockClient, mockKV, mockPlacementService := SetupDatabaseTest(t, ctrl) + mockClient.EXPECT().Store(gomock.Any()).Return(mockKV, nil).AnyTimes() createHandler := NewCreateHandler(mockClient, config.Configuration{}, testDBCfg) w := httptest.NewRecorder() @@ -810,6 +816,8 @@ func TestClusterTypeHostsWithIsolationGroup(t *testing.T) { defer ctrl.Finish() mockClient, mockKV, mockPlacementService := SetupDatabaseTest(t, ctrl) + mockClient.EXPECT().Store(gomock.Any()).Return(mockKV, nil) + createHandler := NewCreateHandler(mockClient, config.Configuration{}, testDBCfg) w := httptest.NewRecorder() diff --git a/src/query/api/v1/handler/headers.go b/src/query/api/v1/handler/headers.go index 205951cde8..e44a6aa7eb 100644 --- a/src/query/api/v1/handler/headers.go +++ b/src/query/api/v1/handler/headers.go @@ -32,4 +32,18 @@ const ( // DeprecatedHeader is the M3 deprecated header DeprecatedHeader = "M3-Deprecated" + + // DefaultServiceEnvironment is the default service ID environment. + DefaultServiceEnvironment = "default_env" + // DefaultServiceZone is the default service ID zone. + DefaultServiceZone = "embedded" + + // HeaderClusterEnvironmentName is the header used to specify the environment + // name. + HeaderClusterEnvironmentName = "Cluster-Environment-Name" + // HeaderClusterZoneName is the header used to specify the zone name. + HeaderClusterZoneName = "Cluster-Zone-Name" + // HeaderDryRun is the header used to specify whether this should be a dry + // run. + HeaderDryRun = "Dry-Run" ) diff --git a/src/query/api/v1/handler/namespace/add.go b/src/query/api/v1/handler/namespace/add.go index db599b82fc..7a5b606325 100644 --- a/src/query/api/v1/handler/namespace/add.go +++ b/src/query/api/v1/handler/namespace/add.go @@ -28,6 +28,7 @@ import ( "path" clusterclient "github.com/m3db/m3/src/cluster/client" + "github.com/m3db/m3/src/cluster/kv" nsproto "github.com/m3db/m3/src/dbnode/generated/proto/namespace" "github.com/m3db/m3/src/dbnode/storage/namespace" "github.com/m3db/m3/src/query/api/v1/handler" @@ -72,7 +73,8 @@ func (h *AddHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - nsRegistry, err := h.Add(md) + opts := handler.NewServiceOptions("kv", r.Header, nil) + nsRegistry, err := h.Add(md, opts) if err != nil { if err == errNamespaceExists { logger.Error("namespace already exists", zap.Error(err)) @@ -108,7 +110,7 @@ func (h *AddHandler) parseRequest(r *http.Request) (*admin.NamespaceAddRequest, } // Add adds a namespace. -func (h *AddHandler) Add(addReq *admin.NamespaceAddRequest) (nsproto.Registry, error) { +func (h *AddHandler) Add(addReq *admin.NamespaceAddRequest, opts handler.ServiceOptions) (nsproto.Registry, error) { var emptyReg = nsproto.Registry{} md, err := namespace.ToMetadata(addReq.Name, addReq.Options) @@ -116,7 +118,11 @@ func (h *AddHandler) Add(addReq *admin.NamespaceAddRequest) (nsproto.Registry, e return emptyReg, fmt.Errorf("unable to get metadata: %v", err) } - store, err := h.client.KV() + kvOpts := kv.NewOverrideOptions(). + SetEnvironment(opts.ServiceEnvironment). + SetZone(opts.ServiceZone) + + store, err := h.client.Store(kvOpts) if err != nil { return emptyReg, err } diff --git a/src/query/api/v1/handler/namespace/add_test.go b/src/query/api/v1/handler/namespace/add_test.go index 23973ed1b3..e3faae9c8c 100644 --- a/src/query/api/v1/handler/namespace/add_test.go +++ b/src/query/api/v1/handler/namespace/add_test.go @@ -64,6 +64,7 @@ const testAddJSON = ` func TestNamespaceAddHandler(t *testing.T) { mockClient, mockKV, _ := SetupNamespaceTest(t) addHandler := NewAddHandler(mockClient) + mockClient.EXPECT().Store(gomock.Any()).Return(mockKV, nil) // Error case where required fields are not set w := httptest.NewRecorder() @@ -108,6 +109,7 @@ func TestNamespaceAddHandler(t *testing.T) { func TestNamespaceAddHandler_Conflict(t *testing.T) { mockClient, mockKV, ctrl := SetupNamespaceTest(t) addHandler := NewAddHandler(mockClient) + mockClient.EXPECT().Store(gomock.Any()).Return(mockKV, nil) // Ensure adding an existing namespace returns 409 req := httptest.NewRequest("POST", "/namespace", strings.NewReader(testAddJSON)) diff --git a/src/query/api/v1/handler/options.go b/src/query/api/v1/handler/options.go new file mode 100644 index 0000000000..f03ca68e20 --- /dev/null +++ b/src/query/api/v1/handler/options.go @@ -0,0 +1,135 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package handler + +import ( + "errors" + "net/http" + "strings" + "time" + + "github.com/m3db/m3/src/cluster/services" +) + +const ( + // M3DBServiceName is the service name for M3DB. + M3DBServiceName = "m3db" + // M3AggregatorServiceName is the service name for M3Aggregator. + M3AggregatorServiceName = "m3aggregator" + // M3CoordinatorServiceName is the service name for M3Coordinator. + M3CoordinatorServiceName = "m3coordinator" + + defaultM3AggMaxAggregationWindowSize = time.Minute + // defaultM3AggWarmupDuration configures the buffer to account for the delay + // of propagating aggregator placement to clients, usually needed when there is + // a large amount of clients sending traffic to m3aggregator. + defaultM3AggWarmupDuration = 0 +) + +var ( + errServiceNameIsRequired = errors.New("service name is required") + errServiceEnvironmentIsRequired = errors.New("service environment is required") + errServiceZoneIsRequired = errors.New("service zone is required") + errM3AggServiceOptionsRequired = errors.New("m3agg service options are required") +) + +// ServiceOptions are the options for Service. +type ServiceOptions struct { + ServiceName string + ServiceEnvironment string + ServiceZone string + + M3Agg *M3AggServiceOptions + + DryRun bool +} + +// M3AggServiceOptions contains the service options that are +// specific to the M3Agg service. +type M3AggServiceOptions struct { + MaxAggregationWindowSize time.Duration + WarmupDuration time.Duration +} + +// NewServiceOptions returns a ServiceOptions based on the provided +// values. +func NewServiceOptions( + serviceName string, headers http.Header, m3AggOpts *M3AggServiceOptions) ServiceOptions { + opts := ServiceOptions{ + ServiceName: serviceName, + ServiceEnvironment: DefaultServiceEnvironment, + ServiceZone: DefaultServiceZone, + + DryRun: false, + + M3Agg: &M3AggServiceOptions{ + MaxAggregationWindowSize: defaultM3AggMaxAggregationWindowSize, + WarmupDuration: defaultM3AggWarmupDuration, + }, + } + + if v := strings.TrimSpace(headers.Get(HeaderClusterEnvironmentName)); v != "" { + opts.ServiceEnvironment = v + } + if v := strings.TrimSpace(headers.Get(HeaderClusterZoneName)); v != "" { + opts.ServiceZone = v + } + if v := strings.TrimSpace(headers.Get(HeaderDryRun)); v == "true" { + opts.DryRun = true + } + + if m3AggOpts != nil { + if m3AggOpts.MaxAggregationWindowSize > 0 { + opts.M3Agg.MaxAggregationWindowSize = m3AggOpts.MaxAggregationWindowSize + } + + if m3AggOpts.WarmupDuration > 0 { + opts.M3Agg.WarmupDuration = m3AggOpts.WarmupDuration + } + } + + return opts +} + +// Validate ensures the service options are valid. +func (opts *ServiceOptions) Validate() error { + if opts.ServiceName == "" { + return errServiceNameIsRequired + } + if opts.ServiceEnvironment == "" { + return errServiceEnvironmentIsRequired + } + if opts.ServiceZone == "" { + return errServiceZoneIsRequired + } + if opts.ServiceName == M3AggregatorServiceName && opts.M3Agg == nil { + return errM3AggServiceOptionsRequired + } + return nil +} + +// ServiceID constructs a cluster services ID from the options. +func (opts *ServiceOptions) ServiceID() services.ServiceID { + return services.NewServiceID(). + SetName(opts.ServiceName). + SetEnvironment(opts.ServiceEnvironment). + SetZone(opts.ServiceZone) +} diff --git a/src/query/api/v1/handler/options_test.go b/src/query/api/v1/handler/options_test.go new file mode 100644 index 0000000000..3507f36310 --- /dev/null +++ b/src/query/api/v1/handler/options_test.go @@ -0,0 +1,98 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package handler + +import ( + "net/http" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestNewServiceOptions(t *testing.T) { + tests := []struct { + service string + headers map[string]string + aggOpts *M3AggServiceOptions + exp ServiceOptions + }{ + { + service: "foo", + exp: ServiceOptions{ + ServiceName: "foo", + ServiceEnvironment: DefaultServiceEnvironment, + ServiceZone: DefaultServiceZone, + M3Agg: &M3AggServiceOptions{ + MaxAggregationWindowSize: time.Minute, + }, + }, + }, + { + service: "foo", + headers: map[string]string{ + HeaderClusterEnvironmentName: "bar", + HeaderClusterZoneName: "baz", + HeaderDryRun: "true", + }, + aggOpts: &M3AggServiceOptions{ + MaxAggregationWindowSize: 2 * time.Minute, + WarmupDuration: time.Minute, + }, + exp: ServiceOptions{ + ServiceName: "foo", + ServiceEnvironment: "bar", + ServiceZone: "baz", + DryRun: true, + M3Agg: &M3AggServiceOptions{ + MaxAggregationWindowSize: 2 * time.Minute, + WarmupDuration: time.Minute, + }, + }, + }, + } + + for _, test := range tests { + h := http.Header{} + for k, v := range test.headers { + h.Add(k, v) + } + opts := NewServiceOptions(test.service, h, test.aggOpts) + assert.Equal(t, test.exp, opts) + } +} + +func TestServiceOptionsValidate(t *testing.T) { + opts := &ServiceOptions{} + assert.Error(t, opts.Validate()) + opts.ServiceName = "foo" + assert.Error(t, opts.Validate()) + opts.ServiceEnvironment = "foo" + assert.Error(t, opts.Validate()) + opts.ServiceZone = "foo" + assert.NoError(t, opts.Validate()) + + opts.ServiceName = M3AggregatorServiceName + assert.Error(t, opts.Validate()) + + opts.M3Agg = &M3AggServiceOptions{} + assert.NoError(t, opts.Validate()) +} diff --git a/src/query/api/v1/handler/placement/add.go b/src/query/api/v1/handler/placement/add.go index 6925a5a910..408bbdfc9c 100644 --- a/src/query/api/v1/handler/placement/add.go +++ b/src/query/api/v1/handler/placement/add.go @@ -29,7 +29,7 @@ import ( "github.com/m3db/m3/src/query/api/v1/handler" "github.com/m3db/m3/src/query/generated/proto/admin" "github.com/m3db/m3/src/query/util/logging" - "github.com/m3db/m3/src/x/net/http" + xhttp "github.com/m3db/m3/src/x/net/http" "github.com/gogo/protobuf/jsonpb" "go.uber.org/zap" @@ -123,7 +123,7 @@ func (h *AddHandler) Add( return nil, err } - serviceOpts := NewServiceOptions( + serviceOpts := handler.NewServiceOptions( serviceName, httpReq.Header, h.M3AggServiceOptions) var validateFn placement.ValidateFn if !req.Force { diff --git a/src/query/api/v1/handler/placement/add_test.go b/src/query/api/v1/handler/placement/add_test.go index 03f6416e03..754dc9fd28 100644 --- a/src/query/api/v1/handler/placement/add_test.go +++ b/src/query/api/v1/handler/placement/add_test.go @@ -31,6 +31,7 @@ import ( "github.com/m3db/m3/src/cluster/placement" "github.com/m3db/m3/src/cmd/services/m3query/config" + apihandler "github.com/m3db/m3/src/query/api/v1/handler" "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" @@ -55,7 +56,7 @@ func TestPlacementAddHandler_Force(t *testing.T) { w = httptest.NewRecorder() req *http.Request ) - if serviceName == M3AggregatorServiceName { + if serviceName == apihandler.M3AggregatorServiceName { req = httptest.NewRequest(AddHTTPMethod, M3DBAddURL, strings.NewReader(`{"force": true, "instances":[]}`)) } else { req = httptest.NewRequest(AddHTTPMethod, M3DBAddURL, strings.NewReader(`{"force": true, "instances":[]}`)) @@ -72,7 +73,7 @@ func TestPlacementAddHandler_Force(t *testing.T) { // Test add success w = httptest.NewRecorder() - if serviceName == M3AggregatorServiceName { + if serviceName == apihandler.M3AggregatorServiceName { req = httptest.NewRequest(AddHTTPMethod, M3DBAddURL, strings.NewReader(`{"force": true, "instances":[{"id": "host1","isolation_group": "rack1","zone": "test","weight": 1,"endpoint": "http://host1:1234","hostname": "host1","port": 1234}]}`)) } else { req = httptest.NewRequest(AddHTTPMethod, M3DBAddURL, strings.NewReader(`{"force": true, "instances":[{"id": "host1","isolation_group": "rack1","zone": "test","weight": 1,"endpoint": "http://host1:1234","hostname": "host1","port": 1234}]}`)) @@ -107,7 +108,7 @@ func TestPlacementAddHandler_SafeErr_NoNewInstance(t *testing.T) { w = httptest.NewRecorder() req *http.Request ) - if serviceName == M3AggregatorServiceName { + if serviceName == apihandler.M3AggregatorServiceName { req = httptest.NewRequest(AddHTTPMethod, M3AggAddURL, strings.NewReader(`{"instances":[]}`)) } else { req = httptest.NewRequest(AddHTTPMethod, M3DBAddURL, strings.NewReader(`{"instances":[]}`)) @@ -140,7 +141,7 @@ func TestPlacementAddHandler_SafeErr_NotAllAvailable(t *testing.T) { w = httptest.NewRecorder() req *http.Request ) - if serviceName == M3AggregatorServiceName { + if serviceName == apihandler.M3AggregatorServiceName { req = httptest.NewRequest(AddHTTPMethod, M3AggAddURL, strings.NewReader(`{"instances":[{"id": "host1","isolation_group": "rack1","zone": "test","weight": 1,"endpoint": "http://host1:1234","hostname": "host1","port": 1234}]}`)) } else { req = httptest.NewRequest(AddHTTPMethod, M3DBAddURL, strings.NewReader(`{"instances":[{"id": "host1","isolation_group": "rack1","zone": "test","weight": 1,"endpoint": "http://host1:1234","hostname": "host1","port": 1234}]}`)) @@ -175,7 +176,7 @@ func TestPlacementAddHandler_SafeOK(t *testing.T) { req *http.Request ) switch serviceName { - case M3AggregatorServiceName: + case apihandler.M3AggregatorServiceName: req = httptest.NewRequest(AddHTTPMethod, M3AggAddURL, strings.NewReader(`{"instances":[{"id": "host1","isolation_group": "rack1","zone": "test","weight": 1,"endpoint": "http://host1:1234","hostname": "host1","port": 1234}]}`)) default: req = httptest.NewRequest(AddHTTPMethod, M3DBAddURL, strings.NewReader(`{"instances":[{"id": "host1","isolation_group": "rack1","zone": "test","weight": 1,"endpoint": "http://host1:1234","hostname": "host1","port": 1234}]}`)) @@ -190,14 +191,14 @@ func TestPlacementAddHandler_SafeOK(t *testing.T) { ) switch serviceName { - case M3CoordinatorServiceName: + case apihandler.M3CoordinatorServiceName: existingPlacement = existingPlacement. SetIsSharded(false). SetReplicaFactor(1) newPlacement = existingPlacement. SetIsSharded(false). SetReplicaFactor(1) - case M3AggregatorServiceName: + case apihandler.M3AggregatorServiceName: existingPlacement = existingPlacement. SetIsMirrored(true). SetReplicaFactor(1) @@ -215,7 +216,7 @@ func TestPlacementAddHandler_SafeOK(t *testing.T) { require.Equal(t, `{"error":"test err"}`+"\n", string(body)) w = httptest.NewRecorder() - if serviceName == M3AggregatorServiceName { + if serviceName == apihandler.M3AggregatorServiceName { req = httptest.NewRequest(AddHTTPMethod, M3AggAddURL, strings.NewReader(`{"instances":[{"id": "host1","isolation_group": "rack1","zone": "test","weight": 1,"endpoint": "http://host1:1234","hostname": "host1","port": 1234}]}`)) } else { req = httptest.NewRequest(AddHTTPMethod, M3DBAddURL, strings.NewReader(`{"instances":[{"id": "host1","isolation_group": "rack1","zone": "test","weight": 1,"endpoint": "http://host1:1234","hostname": "host1","port": 1234}]}`)) @@ -235,7 +236,7 @@ func TestPlacementAddHandler_SafeOK(t *testing.T) { newInst, }) - if serviceName == M3CoordinatorServiceName { + if serviceName == apihandler.M3CoordinatorServiceName { mockPlacementService.EXPECT().AddInstances(gomock.Any()).Return(returnPlacement.SetVersion(1), nil, nil) } else { mockPlacementService.EXPECT().AddInstances(gomock.Any()).Return(existingPlacement.Clone().SetVersion(1), nil, nil) @@ -246,9 +247,9 @@ func TestPlacementAddHandler_SafeOK(t *testing.T) { body, _ = ioutil.ReadAll(resp.Body) switch serviceName { - case M3CoordinatorServiceName: + case apihandler.M3CoordinatorServiceName: require.Equal(t, `{"placement":{"instances":{"host1":{"id":"host1","isolationGroup":"rack1","zone":"test","weight":1,"endpoint":"http://host1:1234","shards":[],"shardSetId":0,"hostname":"host1","port":1234}},"replicaFactor":1,"numShards":0,"isSharded":false,"cutoverTime":"0","isMirrored":false,"maxShardSetId":0},"version":1}`, string(body)) - case M3AggregatorServiceName: + case apihandler.M3AggregatorServiceName: require.Equal(t, `{"placement":{"instances":{},"replicaFactor":1,"numShards":0,"isSharded":true,"cutoverTime":"0","isMirrored":true,"maxShardSetId":0},"version":1}`, string(body)) default: require.Equal(t, `{"placement":{"instances":{},"replicaFactor":0,"numShards":0,"isSharded":true,"cutoverTime":"0","isMirrored":false,"maxShardSetId":0},"version":1}`, string(body)) diff --git a/src/query/api/v1/handler/placement/common.go b/src/query/api/v1/handler/placement/common.go index 822c8d9713..67a3ebcddc 100644 --- a/src/query/api/v1/handler/placement/common.go +++ b/src/query/api/v1/handler/placement/common.go @@ -35,8 +35,9 @@ import ( "github.com/m3db/m3/src/cluster/services" "github.com/m3db/m3/src/cluster/shard" "github.com/m3db/m3/src/cmd/services/m3query/config" + "github.com/m3db/m3/src/query/api/v1/handler" "github.com/m3db/m3/src/query/util/logging" - "github.com/m3db/m3/src/x/net/http" + xhttp "github.com/m3db/m3/src/x/net/http" "github.com/gorilla/mux" ) @@ -52,57 +53,29 @@ func (a allowedServicesSet) String() []string { } const ( - // M3DBServiceName is the service name for M3DB. - M3DBServiceName = "m3db" - // M3AggregatorServiceName is the service name for M3Aggregator. - M3AggregatorServiceName = "m3aggregator" - // M3CoordinatorServiceName is the service name for M3Coordinator. - M3CoordinatorServiceName = "m3coordinator" // ServicesPathName is the services part of the API path. ServicesPathName = "services" // PlacementPathName is the placement part of the API path. PlacementPathName = "placement" - // DefaultServiceEnvironment is the default service ID environment. - DefaultServiceEnvironment = "default_env" - // DefaultServiceZone is the default service ID zone. - DefaultServiceZone = "embedded" - - // HeaderClusterEnvironmentName is the header used to specify the environment name. - HeaderClusterEnvironmentName = "Cluster-Environment-Name" - // HeaderClusterZoneName is the header used to specify the zone name. - HeaderClusterZoneName = "Cluster-Zone-Name" - // HeaderDryRun is the header used to specify whether this should be a dry run. - HeaderDryRun = "Dry-Run" - - defaultM3AggMaxAggregationWindowSize = time.Minute - // defaultM3AggWarmupDuration configures the buffer to account for the delay - // of propagating aggregator placement to clients, usually needed when there is - // a large amount of clients sending traffic to m3aggregator. - defaultM3AggWarmupDuration = 0 - m3AggregatorPlacementNamespace = "/placement" ) var ( - errServiceNameIsRequired = errors.New("service name is required") - errServiceEnvironmentIsRequired = errors.New("service environment is required") - errServiceZoneIsRequired = errors.New("service zone is required") - errUnableToParseService = errors.New("unable to parse service") - errM3AggServiceOptionsRequired = errors.New("m3agg service options are required") - - allowedServices = allowedServicesSet{ - M3DBServiceName: true, - M3AggregatorServiceName: true, - M3CoordinatorServiceName: true, - } - // M3DBServicePlacementPathName is the M3DB service placement API path. - M3DBServicePlacementPathName = path.Join(ServicesPathName, M3DBServiceName, PlacementPathName) + M3DBServicePlacementPathName = path.Join(ServicesPathName, handler.M3DBServiceName, PlacementPathName) // M3AggServicePlacementPathName is the M3Agg service placement API path. - M3AggServicePlacementPathName = path.Join(ServicesPathName, M3AggregatorServiceName, PlacementPathName) + M3AggServicePlacementPathName = path.Join(ServicesPathName, handler.M3AggregatorServiceName, PlacementPathName) // M3CoordinatorServicePlacementPathName is the M3Coordinator service placement API path. - M3CoordinatorServicePlacementPathName = path.Join(ServicesPathName, M3CoordinatorServiceName, PlacementPathName) + M3CoordinatorServicePlacementPathName = path.Join(ServicesPathName, handler.M3CoordinatorServiceName, PlacementPathName) + + errUnableToParseService = errors.New("unable to parse service") + + allowedServices = allowedServicesSet{ + handler.M3DBServiceName: true, + handler.M3AggregatorServiceName: true, + handler.M3CoordinatorServiceName: true, + } ) // HandlerOptions is the options struct for the handler. @@ -112,14 +85,14 @@ type HandlerOptions struct { ClusterClient clusterclient.Client Config config.Configuration - M3AggServiceOptions *M3AggServiceOptions + M3AggServiceOptions *handler.M3AggServiceOptions } // NewHandlerOptions is the constructor function for HandlerOptions. func NewHandlerOptions( client clusterclient.Client, cfg config.Configuration, - m3AggOpts *M3AggServiceOptions, + m3AggOpts *handler.M3AggServiceOptions, ) HandlerOptions { return HandlerOptions{ ClusterClient: client, @@ -136,68 +109,10 @@ type Handler struct { nowFn func() time.Time } -// ServiceOptions are the options for Service. -type ServiceOptions struct { - ServiceName string - ServiceEnvironment string - ServiceZone string - - M3Agg *M3AggServiceOptions - - DryRun bool -} - -// M3AggServiceOptions contains the service options that are -// specific to the M3Agg service. -type M3AggServiceOptions struct { - MaxAggregationWindowSize time.Duration - WarmupDuration time.Duration -} - -// NewServiceOptions returns a ServiceOptions based on the provided -// values. -func NewServiceOptions( - serviceName string, headers http.Header, m3AggOpts *M3AggServiceOptions) ServiceOptions { - opts := ServiceOptions{ - ServiceName: serviceName, - ServiceEnvironment: DefaultServiceEnvironment, - ServiceZone: DefaultServiceZone, - - DryRun: false, - - M3Agg: &M3AggServiceOptions{ - MaxAggregationWindowSize: defaultM3AggMaxAggregationWindowSize, - WarmupDuration: defaultM3AggWarmupDuration, - }, - } - - if v := strings.TrimSpace(headers.Get(HeaderClusterEnvironmentName)); v != "" { - opts.ServiceEnvironment = v - } - if v := strings.TrimSpace(headers.Get(HeaderClusterZoneName)); v != "" { - opts.ServiceZone = v - } - if v := strings.TrimSpace(headers.Get(HeaderDryRun)); v == "true" { - opts.DryRun = true - } - - if m3AggOpts != nil { - if m3AggOpts.MaxAggregationWindowSize > 0 { - opts.M3Agg.MaxAggregationWindowSize = m3AggOpts.MaxAggregationWindowSize - } - - if m3AggOpts.WarmupDuration > 0 { - opts.M3Agg.WarmupDuration = m3AggOpts.MaxAggregationWindowSize - } - } - - return opts -} - // Service gets a placement service from m3cluster client func Service( clusterClient clusterclient.Client, - opts ServiceOptions, + opts handler.ServiceOptions, now time.Time, validationFn placement.ValidateFn, ) (placement.Service, error) { @@ -210,13 +125,13 @@ func Service( // control over placement updates. func ServiceWithAlgo( clusterClient clusterclient.Client, - opts ServiceOptions, + opts handler.ServiceOptions, now time.Time, validationFn placement.ValidateFn, ) (placement.Service, placement.Algorithm, error) { overrides := services.NewOverrideOptions() switch opts.ServiceName { - case M3AggregatorServiceName: + case handler.M3AggregatorServiceName: overrides = overrides. SetNamespaceOptions( overrides.NamespaceOptions(). @@ -229,39 +144,27 @@ func ServiceWithAlgo( return nil, nil, err } + if err := opts.Validate(); err != nil { + return nil, nil, err + } + if _, ok := allowedServices[opts.ServiceName]; !ok { return nil, nil, fmt.Errorf( "invalid service name: %s, must be one of: %s", opts.ServiceName, allowedServices.String()) } - if opts.ServiceName == "" { - return nil, nil, errServiceNameIsRequired - } - if opts.ServiceEnvironment == "" { - return nil, nil, errServiceEnvironmentIsRequired - } - if opts.ServiceZone == "" { - return nil, nil, errServiceZoneIsRequired - } - if opts.ServiceName == M3AggregatorServiceName && opts.M3Agg == nil { - return nil, nil, errM3AggServiceOptionsRequired - } - - sid := services.NewServiceID(). - SetName(opts.ServiceName). - SetEnvironment(opts.ServiceEnvironment). - SetZone(opts.ServiceZone) + sid := opts.ServiceID() pOpts := placement.NewOptions(). SetValidZone(opts.ServiceZone). SetIsSharded(true). SetDryrun(opts.DryRun) switch opts.ServiceName { - case M3CoordinatorServiceName: + case handler.M3CoordinatorServiceName: pOpts = pOpts. SetIsSharded(false) - case M3AggregatorServiceName: + case handler.M3AggregatorServiceName: var ( maxAggregationWindowSize = opts.M3Agg.MaxAggregationWindowSize warmupDuration = opts.M3Agg.WarmupDuration @@ -510,7 +413,7 @@ func applyDeprecatedMiddleware( ) func(w http.ResponseWriter, r *http.Request) { return logging.WithResponseTimeAndPanicErrorLoggingFunc( func(w http.ResponseWriter, r *http.Request) { - f(M3DBServiceName, w, r) + f(handler.M3DBServiceName, w, r) }, ).ServeHTTP } @@ -547,7 +450,7 @@ func parseServiceFromRequest(r *http.Request) (string, error) { func isStateless(serviceName string) bool { switch serviceName { - case M3CoordinatorServiceName: + case handler.M3CoordinatorServiceName: return true } return false diff --git a/src/query/api/v1/handler/placement/common_test.go b/src/query/api/v1/handler/placement/common_test.go index ef0c8ad383..bceaf5eb1b 100644 --- a/src/query/api/v1/handler/placement/common_test.go +++ b/src/query/api/v1/handler/placement/common_test.go @@ -30,6 +30,7 @@ import ( "github.com/m3db/m3/src/cluster/placement" "github.com/m3db/m3/src/cluster/services" "github.com/m3db/m3/src/cluster/shard" + "github.com/m3db/m3/src/query/api/v1/handler" "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" @@ -52,7 +53,7 @@ func TestPlacementService(t *testing.T) { mockServices.EXPECT().PlacementService(gomock.Not(nil), gomock.Not(nil)).Return(mockPlacementService, nil) placementService, algo, err := ServiceWithAlgo( - mockClient, NewServiceOptions(M3DBServiceName, nil, nil), time.Time{}, nil) + mockClient, handler.NewServiceOptions(handler.M3DBServiceName, nil, nil), time.Time{}, nil) assert.NoError(t, err) assert.NotNil(t, placementService) assert.NotNil(t, algo) @@ -60,7 +61,7 @@ func TestPlacementService(t *testing.T) { // Test Services returns error mockClient.EXPECT().Services(gomock.Not(nil)).Return(nil, errors.New("dummy service error")) placementService, err = Service( - mockClient, NewServiceOptions(M3DBServiceName, nil, nil), time.Time{}, nil) + mockClient, handler.NewServiceOptions(handler.M3DBServiceName, nil, nil), time.Time{}, nil) assert.Nil(t, placementService) assert.EqualError(t, err, "dummy service error") @@ -68,7 +69,7 @@ func TestPlacementService(t *testing.T) { mockClient.EXPECT().Services(gomock.Not(nil)).Return(mockServices, nil) mockServices.EXPECT().PlacementService(gomock.Not(nil), gomock.Not(nil)).Return(nil, errors.New("dummy placement error")) placementService, err = Service( - mockClient, NewServiceOptions(M3DBServiceName, nil, nil), time.Time{}, nil) + mockClient, handler.NewServiceOptions(handler.M3DBServiceName, nil, nil), time.Time{}, nil) assert.Nil(t, placementService) assert.EqualError(t, err, "dummy placement error") }) @@ -99,10 +100,10 @@ func TestPlacementServiceWithClusterHeaders(t *testing.T) { }) var ( - serviceValue = M3DBServiceName + serviceValue = handler.M3DBServiceName environmentValue = "bar_env" zoneValue = "baz_zone" - opts = NewServiceOptions(serviceValue, nil, nil) + opts = handler.NewServiceOptions(serviceValue, nil, nil) ) opts.ServiceEnvironment = environmentValue opts.ServiceZone = zoneValue @@ -287,14 +288,14 @@ func runForAllAllowedServices(f func(service string)) { func TestIsStateless(t *testing.T) { for _, s := range []string{ - M3CoordinatorServiceName, + handler.M3CoordinatorServiceName, } { assert.True(t, isStateless(s)) } for _, s := range []string{ - M3AggregatorServiceName, - M3DBServiceName, + handler.M3AggregatorServiceName, + handler.M3DBServiceName, } { assert.False(t, isStateless(s)) } diff --git a/src/query/api/v1/handler/placement/delete.go b/src/query/api/v1/handler/placement/delete.go index 7765309701..713eca8ca1 100644 --- a/src/query/api/v1/handler/placement/delete.go +++ b/src/query/api/v1/handler/placement/delete.go @@ -31,7 +31,7 @@ import ( "github.com/m3db/m3/src/query/api/v1/handler" "github.com/m3db/m3/src/query/generated/proto/admin" "github.com/m3db/m3/src/query/util/logging" - "github.com/m3db/m3/src/x/net/http" + xhttp "github.com/m3db/m3/src/x/net/http" "github.com/gorilla/mux" "go.uber.org/zap" @@ -87,7 +87,7 @@ func (h *DeleteHandler) ServeHTTP(serviceName string, w http.ResponseWriter, r * var ( force = r.FormValue(placementForceVar) == "true" - opts = NewServiceOptions( + opts = handler.NewServiceOptions( serviceName, r.Header, h.M3AggServiceOptions) ) diff --git a/src/query/api/v1/handler/placement/delete_all.go b/src/query/api/v1/handler/placement/delete_all.go index ca0331a79d..1e9e213c8b 100644 --- a/src/query/api/v1/handler/placement/delete_all.go +++ b/src/query/api/v1/handler/placement/delete_all.go @@ -28,7 +28,7 @@ import ( "github.com/m3db/m3/src/query/api/v1/handler" "github.com/m3db/m3/src/query/util/logging" - "github.com/m3db/m3/src/x/net/http" + xhttp "github.com/m3db/m3/src/x/net/http" "go.uber.org/zap" ) @@ -68,7 +68,7 @@ func (h *DeleteAllHandler) ServeHTTP(serviceName string, w http.ResponseWriter, var ( ctx = r.Context() logger = logging.WithContext(ctx) - opts = NewServiceOptions( + opts = handler.NewServiceOptions( serviceName, r.Header, h.M3AggServiceOptions) ) diff --git a/src/query/api/v1/handler/placement/delete_test.go b/src/query/api/v1/handler/placement/delete_test.go index 36fec9aed2..2da0656390 100644 --- a/src/query/api/v1/handler/placement/delete_test.go +++ b/src/query/api/v1/handler/placement/delete_test.go @@ -31,6 +31,7 @@ import ( "github.com/m3db/m3/src/cluster/placement" "github.com/m3db/m3/src/cluster/shard" "github.com/m3db/m3/src/cmd/services/m3query/config" + apihandler "github.com/m3db/m3/src/query/api/v1/handler" "github.com/golang/mock/gomock" "github.com/gorilla/mux" @@ -93,7 +94,7 @@ func testDeleteHandlerSafe(t *testing.T, serviceName string) { handlerOpts = NewHandlerOptions( mockClient, config.Configuration{}, - &M3AggServiceOptions{ + &apihandler.M3AggServiceOptions{ WarmupDuration: time.Minute, MaxAggregationWindowSize: 5 * time.Minute, }, @@ -110,14 +111,14 @@ func testDeleteHandlerSafe(t *testing.T, serviceName string) { handler.nowFn = func() time.Time { return time.Unix(0, 0) } switch serviceName { - case M3CoordinatorServiceName: + case apihandler.M3CoordinatorServiceName: basePlacement = basePlacement. SetIsSharded(false). SetReplicaFactor(1) mockPlacementService.EXPECT(). RemoveInstances([]string{"host1"}). Return(placement.NewPlacement(), nil) - case M3AggregatorServiceName: + case apihandler.M3AggregatorServiceName: basePlacement = basePlacement. SetIsMirrored(true) } @@ -133,7 +134,7 @@ func testDeleteHandlerSafe(t *testing.T, serviceName string) { body, err := ioutil.ReadAll(resp.Body) require.NoError(t, err) switch serviceName { - case M3CoordinatorServiceName: + case apihandler.M3CoordinatorServiceName: require.Equal(t, http.StatusOK, resp.StatusCode) default: assert.Contains(t, string(body), "instance host1 not found in placement") @@ -151,7 +152,7 @@ func testDeleteHandlerSafe(t *testing.T, serviceName string) { }) switch serviceName { - case M3CoordinatorServiceName: + case apihandler.M3CoordinatorServiceName: // M3Coordinator placement changes are alway safe because it is stateless default: w = httptest.NewRecorder() @@ -188,7 +189,7 @@ func testDeleteHandlerSafe(t *testing.T, serviceName string) { var returnPlacement placement.Placement switch serviceName { - case M3CoordinatorServiceName: + case apihandler.M3CoordinatorServiceName: basePlacement. SetIsSharded(false). SetReplicaFactor(1). @@ -197,7 +198,7 @@ func testDeleteHandlerSafe(t *testing.T, serviceName string) { mockPlacementService.EXPECT(). RemoveInstances([]string{"host1"}). Return(placement.NewPlacement(), nil) - case M3AggregatorServiceName: + case apihandler.M3AggregatorServiceName: // Need to be mirrored in M3Agg case basePlacement.SetReplicaFactor(1).SetMaxShardSetID(2).SetInstances([]placement.Instance{ placement.NewInstance().SetID("host1").SetIsolationGroup("a").SetWeight(10).SetShardSetID(0). @@ -224,7 +225,7 @@ func testDeleteHandlerSafe(t *testing.T, serviceName string) { shard.NewShard(1).SetState(shard.Available), })), }).SetVersion(2) - case M3DBServiceName: + case apihandler.M3DBServiceName: returnPlacement = basePlacement.Clone().SetInstances([]placement.Instance{ placement.NewInstance().SetID("host1").SetIsolationGroup("a").SetWeight(10). SetShards(shard.NewShards([]shard.Shard{ @@ -259,9 +260,9 @@ func testDeleteHandlerSafe(t *testing.T, serviceName string) { body, err = ioutil.ReadAll(resp.Body) require.NoError(t, err) switch serviceName { - case M3CoordinatorServiceName: + case apihandler.M3CoordinatorServiceName: require.Equal(t, `{"placement":{"instances":{},"replicaFactor":0,"numShards":0,"isSharded":false,"cutoverTime":"0","isMirrored":false,"maxShardSetId":0},"version":0}`, string(body)) - case M3AggregatorServiceName: + case apihandler.M3AggregatorServiceName: require.Equal(t, `{"placement":{"instances":{"host1":{"id":"host1","isolationGroup":"a","zone":"","weight":10,"endpoint":"","shards":[{"id":0,"state":"LEAVING","sourceId":"","cutoverNanos":"0","cutoffNanos":"300000000000"}],"shardSetId":0,"hostname":"","port":0},"host2":{"id":"host2","isolationGroup":"b","zone":"","weight":10,"endpoint":"","shards":[{"id":0,"state":"INITIALIZING","sourceId":"host1","cutoverNanos":"300000000000","cutoffNanos":"0"},{"id":1,"state":"AVAILABLE","sourceId":"","cutoverNanos":"0","cutoffNanos":"0"}],"shardSetId":1,"hostname":"","port":0}},"replicaFactor":1,"numShards":0,"isSharded":true,"cutoverTime":"0","isMirrored":true,"maxShardSetId":2},"version":2}`, string(body)) default: require.Equal(t, `{"placement":{"instances":{"host1":{"id":"host1","isolationGroup":"a","zone":"","weight":10,"endpoint":"","shards":[{"id":0,"state":"LEAVING","sourceId":"","cutoverNanos":"0","cutoffNanos":"0"}],"shardSetId":0,"hostname":"","port":0},"host2":{"id":"host2","isolationGroup":"b","zone":"","weight":10,"endpoint":"","shards":[{"id":0,"state":"AVAILABLE","sourceId":"","cutoverNanos":"0","cutoffNanos":"0"},{"id":1,"state":"AVAILABLE","sourceId":"","cutoverNanos":"0","cutoffNanos":"0"}],"shardSetId":0,"hostname":"","port":0},"host3":{"id":"host3","isolationGroup":"c","zone":"","weight":10,"endpoint":"","shards":[{"id":0,"state":"INITIALIZING","sourceId":"host1","cutoverNanos":"0","cutoffNanos":"0"},{"id":1,"state":"AVAILABLE","sourceId":"","cutoverNanos":"0","cutoffNanos":"0"}],"shardSetId":0,"hostname":"","port":0}},"replicaFactor":2,"numShards":0,"isSharded":true,"cutoverTime":"0","isMirrored":false,"maxShardSetId":2},"version":2}`, string(body)) diff --git a/src/query/api/v1/handler/placement/get.go b/src/query/api/v1/handler/placement/get.go index 02010ce866..b0521cb864 100644 --- a/src/query/api/v1/handler/placement/get.go +++ b/src/query/api/v1/handler/placement/get.go @@ -115,7 +115,7 @@ func (h *GetHandler) Get( headers = httpReq.Header } - opts := NewServiceOptions( + opts := handler.NewServiceOptions( serviceName, headers, h.M3AggServiceOptions) service, err := Service(h.ClusterClient, opts, h.nowFn(), nil) diff --git a/src/query/api/v1/handler/placement/init.go b/src/query/api/v1/handler/placement/init.go index 9e97b1a07c..1ddecc406f 100644 --- a/src/query/api/v1/handler/placement/init.go +++ b/src/query/api/v1/handler/placement/init.go @@ -123,7 +123,7 @@ func (h *InitHandler) Init( return nil, err } - serviceOpts := NewServiceOptions( + serviceOpts := handler.NewServiceOptions( serviceName, httpReq.Header, h.M3AggServiceOptions) service, err := Service(h.ClusterClient, serviceOpts, h.nowFn(), nil) @@ -133,7 +133,7 @@ func (h *InitHandler) Init( replicationFactor := int(req.ReplicationFactor) switch serviceName { - case M3CoordinatorServiceName: + case handler.M3CoordinatorServiceName: // M3Coordinator placements are stateless replicationFactor = 1 } diff --git a/src/query/api/v1/handler/placement/init_test.go b/src/query/api/v1/handler/placement/init_test.go index 77b76a9a46..af5a345d23 100644 --- a/src/query/api/v1/handler/placement/init_test.go +++ b/src/query/api/v1/handler/placement/init_test.go @@ -32,6 +32,7 @@ import ( "github.com/m3db/m3/src/cluster/kv" "github.com/m3db/m3/src/cluster/placement" "github.com/m3db/m3/src/cmd/services/m3query/config" + apihandler "github.com/m3db/m3/src/query/api/v1/handler" "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" @@ -80,7 +81,7 @@ func TestPlacementInitHandler(t *testing.T) { w = httptest.NewRecorder() req *http.Request ) - if serviceName == M3AggregatorServiceName { + if serviceName == apihandler.M3AggregatorServiceName { req = httptest.NewRequest(InitHTTPMethod, M3DBInitURL, strings.NewReader(`{"instances": [{"id": "host1","isolation_group": "rack1","zone": "test","weight": 1,"endpoint": "http://host1:1234","hostname": "host1","port": 1234},{"id": "host2","isolation_group": "rack1","zone": "test","weight": 1,"endpoint": "http://host2:1234","hostname": "host2","port": 1234}],"num_shards": 16,"replication_factor": 1}`)) } else { req = httptest.NewRequest(InitHTTPMethod, M3DBInitURL, strings.NewReader(`{"instances": [{"id": "host1","isolation_group": "rack1","zone": "test","weight": 1,"endpoint": "http://host1:1234","hostname": "host1","port": 1234},{"id": "host2","isolation_group": "rack1","zone": "test","weight": 1,"endpoint": "http://host2:1234","hostname": "host2","port": 1234}],"num_shards": 16,"replication_factor": 1}`)) @@ -101,7 +102,7 @@ func TestPlacementInitHandler(t *testing.T) { // Test error response w = httptest.NewRecorder() - if serviceName == M3AggregatorServiceName { + if serviceName == apihandler.M3AggregatorServiceName { req = httptest.NewRequest(InitHTTPMethod, M3DBInitURL, strings.NewReader(`{"instances": [{"id": "host1","isolation_group": "rack1","zone": "test","weight": 1,"endpoint": "host1:1234","hostname": "host1","port": 1234},{"id": "host2","isolation_group": "rack1","zone": "test","weight": 1,"endpoint": "http://host2:1234","hostname": "host2","port": 1234}],"num_shards": 64,"replication_factor": 2}`)) } else { req = httptest.NewRequest(InitHTTPMethod, M3DBInitURL, strings.NewReader(`{"instances": [{"id": "host1","isolation_group": "rack1","zone": "test","weight": 1,"endpoint": "host1:1234","hostname": "host1","port": 1234},{"id": "host2","isolation_group": "rack1","zone": "test","weight": 1,"endpoint": "http://host2:1234","hostname": "host2","port": 1234}],"num_shards": 64,"replication_factor": 2}`)) @@ -109,7 +110,7 @@ func TestPlacementInitHandler(t *testing.T) { require.NotNil(t, req) switch serviceName { - case M3CoordinatorServiceName: + case apihandler.M3CoordinatorServiceName: mockPlacementService.EXPECT(). BuildInitialPlacement(gomock.Not(nil), 64, 1). Return(nil, errors.New("unable to build initial placement")) @@ -127,7 +128,7 @@ func TestPlacementInitHandler(t *testing.T) { // Test error response w = httptest.NewRecorder() - if serviceName == M3AggregatorServiceName { + if serviceName == apihandler.M3AggregatorServiceName { req = httptest.NewRequest(InitHTTPMethod, M3DBInitURL, strings.NewReader(`{"instances": [{"id": "host1","isolation_group": "rack1","zone": "test","weight": 1,"endpoint": "host1:1234","hostname": "host1","port": 1234},{"id": "host2","isolation_group": "rack1","zone": "test","weight": 1,"endpoint": "http://host2:1234","hostname": "host2","port": 1234}],"num_shards": 64,"replication_factor": 2}`)) } else { req = httptest.NewRequest(InitHTTPMethod, M3DBInitURL, strings.NewReader(`{"instances": [{"id": "host1","isolation_group": "rack1","zone": "test","weight": 1,"endpoint": "host1:1234","hostname": "host1","port": 1234},{"id": "host2","isolation_group": "rack1","zone": "test","weight": 1,"endpoint": "http://host2:1234","hostname": "host2","port": 1234}],"num_shards": 64,"replication_factor": 2}`)) @@ -135,7 +136,7 @@ func TestPlacementInitHandler(t *testing.T) { require.NotNil(t, req) switch serviceName { - case M3CoordinatorServiceName: + case apihandler.M3CoordinatorServiceName: mockPlacementService.EXPECT(). BuildInitialPlacement(gomock.Not(nil), 64, 1). Return(nil, kv.ErrAlreadyExists) diff --git a/src/query/api/v1/handler/placement/replace.go b/src/query/api/v1/handler/placement/replace.go index cf495c2929..ec7c20ad05 100644 --- a/src/query/api/v1/handler/placement/replace.go +++ b/src/query/api/v1/handler/placement/replace.go @@ -48,11 +48,11 @@ var ( // M3AggReplaceURL is the url for the m3aggregator replace handler (method // POST). - M3AggReplaceURL = path.Join(handler.RoutePrefixV1, M3AggregatorServiceName, replacePathName) + M3AggReplaceURL = path.Join(handler.RoutePrefixV1, handler.M3AggregatorServiceName, replacePathName) // M3CoordinatorReplaceURL is the url for the m3coordinator replace handler // (method POST). - M3CoordinatorReplaceURL = path.Join(handler.RoutePrefixV1, M3CoordinatorServiceName, replacePathName) + M3CoordinatorReplaceURL = path.Join(handler.RoutePrefixV1, handler.M3CoordinatorServiceName, replacePathName) ) // ReplaceHandler is the type for placement replaces. @@ -121,7 +121,7 @@ func (h *ReplaceHandler) Replace( return nil, err } - serviceOpts := NewServiceOptions(serviceName, httpReq.Header, h.M3AggServiceOptions) + serviceOpts := handler.NewServiceOptions(serviceName, httpReq.Header, h.M3AggServiceOptions) service, algo, err := ServiceWithAlgo(h.ClusterClient, serviceOpts, h.nowFn(), nil) if err != nil { return nil, err diff --git a/src/query/api/v1/handler/placement/replace_test.go b/src/query/api/v1/handler/placement/replace_test.go index 83f577eb62..f5a8b7c4a0 100644 --- a/src/query/api/v1/handler/placement/replace_test.go +++ b/src/query/api/v1/handler/placement/replace_test.go @@ -32,6 +32,7 @@ import ( "github.com/m3db/m3/src/cluster/placement" "github.com/m3db/m3/src/cluster/shard" "github.com/m3db/m3/src/cmd/services/m3query/config" + apihandler "github.com/m3db/m3/src/query/api/v1/handler" "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" @@ -113,7 +114,7 @@ func testPlacementReplaceHandlerSafeErr(t *testing.T, serviceName string) { req := newReplaceRequest("{}") mockPlacementService.EXPECT().Placement().Return(newInitPlacement(), nil) - if serviceName == M3CoordinatorServiceName { + if serviceName == apihandler.M3CoordinatorServiceName { mockPlacementService.EXPECT().CheckAndSet(gomock.Any(), 0). Return(newInitPlacement().SetVersion(1), nil) } @@ -123,7 +124,7 @@ func testPlacementReplaceHandlerSafeErr(t *testing.T, serviceName string) { body, _ := ioutil.ReadAll(resp.Body) switch serviceName { - case M3CoordinatorServiceName: + case apihandler.M3CoordinatorServiceName: assert.Equal(t, http.StatusOK, resp.StatusCode) default: assert.Equal(t, http.StatusBadRequest, resp.StatusCode) @@ -173,10 +174,10 @@ func testPlacementReplaceHandlerSafeOk(t *testing.T, serviceName string) { matcher := gomock.Any() switch serviceName { - case M3DBServiceName: + case apihandler.M3DBServiceName: pl = pl.SetIsSharded(true) matcher = newPlacementReplaceMatcher() - case M3AggregatorServiceName: + case apihandler.M3AggregatorServiceName: pl = pl.SetIsSharded(true).SetIsMirrored(true) matcher = newPlacementReplaceMatcher() default: @@ -185,7 +186,7 @@ func testPlacementReplaceHandlerSafeOk(t *testing.T, serviceName string) { instances := pl.Instances() for i, inst := range instances { newInst := inst.SetIsolationGroup("r1").SetZone("z1").SetWeight(1) - if serviceName == M3CoordinatorServiceName { + if serviceName == apihandler.M3CoordinatorServiceName { newInst = newInst.SetShards(shard.NewShards([]shard.Shard{})) } instances[i] = newInst @@ -245,13 +246,13 @@ func testPlacementReplaceHandlerSafeOk(t *testing.T, serviceName string) { body, _ := ioutil.ReadAll(resp.Body) switch serviceName { - case M3CoordinatorServiceName: + case apihandler.M3CoordinatorServiceName: exp := `{"placement":{"instances":{"B":{"id":"B","isolationGroup":"r1","zone":"z1","weight":1,"endpoint":"","shards":[],"shardSetId":0,"hostname":"","port":0},"C":{"id":"C","isolationGroup":"r1","zone":"z1","weight":1,"endpoint":"","shards":[],"shardSetId":0,"hostname":"","port":0}},"replicaFactor":0,"numShards":0,"isSharded":false,"cutoverTime":"0","isMirrored":false,"maxShardSetId":0},"version":2}` assert.Equal(t, exp, string(body)) - case M3DBServiceName: + case apihandler.M3DBServiceName: exp := `{"placement":{"instances":{"A":{"id":"A","isolationGroup":"r1","zone":"z1","weight":1,"endpoint":"","shards":[{"id":1,"state":"LEAVING","sourceId":"","cutoverNanos":"0","cutoffNanos":"0"}],"shardSetId":0,"hostname":"","port":0},"B":{"id":"B","isolationGroup":"r1","zone":"z1","weight":1,"endpoint":"","shards":[{"id":1,"state":"AVAILABLE","sourceId":"","cutoverNanos":"0","cutoffNanos":"0"}],"shardSetId":0,"hostname":"","port":0},"C":{"id":"C","isolationGroup":"r1","zone":"z1","weight":1,"endpoint":"","shards":[{"id":1,"state":"INITIALIZING","sourceId":"A","cutoverNanos":"0","cutoffNanos":"0"}],"shardSetId":0,"hostname":"","port":0}},"replicaFactor":0,"numShards":0,"isSharded":true,"cutoverTime":"0","isMirrored":false,"maxShardSetId":0},"version":2}` assert.Equal(t, exp, string(body)) - case M3AggregatorServiceName: + case apihandler.M3AggregatorServiceName: exp := `{"placement":{"instances":{"A":{"id":"A","isolationGroup":"r1","zone":"z1","weight":1,"endpoint":"","shards":[{"id":1,"state":"LEAVING","sourceId":"","cutoverNanos":"0","cutoffNanos":"0"}],"shardSetId":0,"hostname":"","port":0},"B":{"id":"B","isolationGroup":"r1","zone":"z1","weight":1,"endpoint":"","shards":[{"id":1,"state":"AVAILABLE","sourceId":"","cutoverNanos":"0","cutoffNanos":"0"}],"shardSetId":0,"hostname":"","port":0},"C":{"id":"C","isolationGroup":"r1","zone":"z1","weight":1,"endpoint":"","shards":[{"id":1,"state":"INITIALIZING","sourceId":"A","cutoverNanos":"0","cutoffNanos":"0"}],"shardSetId":0,"hostname":"","port":0}},"replicaFactor":0,"numShards":0,"isSharded":true,"cutoverTime":"0","isMirrored":true,"maxShardSetId":0},"version":2}` assert.Equal(t, exp, string(body)) default: diff --git a/src/query/api/v1/httpd/handler.go b/src/query/api/v1/httpd/handler.go index fb0b92b3b7..36072c89a4 100644 --- a/src/query/api/v1/httpd/handler.go +++ b/src/query/api/v1/httpd/handler.go @@ -54,7 +54,7 @@ import ( "github.com/gorilla/mux" "github.com/opentracing-contrib/go-stdlib/nethttp" - "github.com/opentracing/opentracing-go" + opentracing "github.com/opentracing/opentracing-go" "github.com/uber-go/tally" ) @@ -251,7 +251,7 @@ func (h *Handler) RegisterRoutes() error { return nil } -func (h *Handler) m3AggServiceOptions() *placement.M3AggServiceOptions { +func (h *Handler) m3AggServiceOptions() *handler.M3AggServiceOptions { if h.clusters == nil { return nil } @@ -268,7 +268,7 @@ func (h *Handler) m3AggServiceOptions() *placement.M3AggServiceOptions { return nil } - return &placement.M3AggServiceOptions{ + return &handler.M3AggServiceOptions{ MaxAggregationWindowSize: maxResolution, } } diff --git a/src/x/serialize/serialize_mock.go b/src/x/serialize/serialize_mock.go index 79ca2af7e3..10c7c17caf 100644 --- a/src/x/serialize/serialize_mock.go +++ b/src/x/serialize/serialize_mock.go @@ -1,7 +1,7 @@ // Code generated by MockGen. DO NOT EDIT. // Source: github.com/m3db/m3/src/x/serialize (interfaces: TagEncoder,TagEncoderPool,TagDecoder,TagDecoderPool,MetricTagsIterator,MetricTagsIteratorPool) -// Copyright (c) 2018 Uber Technologies, Inc. +// Copyright (c) 2019 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal From c31e3dab69fc0fa55c4ddc0db8ccfb3f523109ce Mon Sep 17 00:00:00 2001 From: Matt Schallert Date: Tue, 5 Mar 2019 21:50:36 -0500 Subject: [PATCH 2/3] refactor allowedservices --- src/query/api/v1/handler/allowed_services.go | 56 +++++++++++++++++++ .../api/v1/handler/allowed_services_test.go | 45 +++++++++++++++ src/query/api/v1/handler/database/create.go | 2 +- src/query/api/v1/handler/placement/common.go | 24 ++------ .../api/v1/handler/placement/common_test.go | 2 +- 5 files changed, 107 insertions(+), 22 deletions(-) create mode 100644 src/query/api/v1/handler/allowed_services.go create mode 100644 src/query/api/v1/handler/allowed_services_test.go diff --git a/src/query/api/v1/handler/allowed_services.go b/src/query/api/v1/handler/allowed_services.go new file mode 100644 index 0000000000..075b05c9f1 --- /dev/null +++ b/src/query/api/v1/handler/allowed_services.go @@ -0,0 +1,56 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package handler + +type allowedServicesSet map[string]bool + +func (a allowedServicesSet) String() []string { + s := make([]string, 0, len(a)) + for key := range a { + s = append(s, key) + } + return s +} + +var ( + allowedServices = allowedServicesSet{ + M3DBServiceName: true, + M3AggregatorServiceName: true, + M3CoordinatorServiceName: true, + } +) + +// IsAllowedService returns whether a service name is a valid M3 service. +func IsAllowedService(svc string) bool { + _, ok := allowedServices[svc] + return ok +} + +// AllowedServices returns the list of valid M3 services. +func AllowedServices() []string { + svcs := make([]string, 0, len(allowedServices)) + for svc, allowed := range allowedServices { + if allowed { + svcs = append(svcs, svc) + } + } + return svcs +} diff --git a/src/query/api/v1/handler/allowed_services_test.go b/src/query/api/v1/handler/allowed_services_test.go new file mode 100644 index 0000000000..8d3fe1fa25 --- /dev/null +++ b/src/query/api/v1/handler/allowed_services_test.go @@ -0,0 +1,45 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package handler + +import ( + "sort" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestIsAllowedService(t *testing.T) { + assert.True(t, IsAllowedService("m3db")) + assert.False(t, IsAllowedService("foo")) +} + +func TestAllowedServices(t *testing.T) { + exp := []string{ + "m3aggregator", + "m3coordinator", + "m3db", + } + + svcs := AllowedServices() + sort.Strings(svcs) + assert.Equal(t, exp, svcs) +} diff --git a/src/query/api/v1/handler/database/create.go b/src/query/api/v1/handler/database/create.go index b1b9a11033..8c8fac3478 100644 --- a/src/query/api/v1/handler/database/create.go +++ b/src/query/api/v1/handler/database/create.go @@ -205,7 +205,7 @@ func (h *createHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - opts := handler.NewServiceOptions("kv", r.Header, nil) + opts := handler.NewServiceOptions(handler.M3DBServiceName, r.Header, nil) nsRegistry, err = h.namespaceAddHandler.Add(namespaceRequest, opts) if err != nil { logger.Error("unable to add namespace", zap.Error(err)) diff --git a/src/query/api/v1/handler/placement/common.go b/src/query/api/v1/handler/placement/common.go index 67a3ebcddc..4765fb2c35 100644 --- a/src/query/api/v1/handler/placement/common.go +++ b/src/query/api/v1/handler/placement/common.go @@ -42,16 +42,6 @@ import ( "github.com/gorilla/mux" ) -type allowedServicesSet map[string]bool - -func (a allowedServicesSet) String() []string { - s := make([]string, 0, len(a)) - for key := range a { - s = append(s, key) - } - return s -} - const ( // ServicesPathName is the services part of the API path. ServicesPathName = "services" @@ -70,12 +60,6 @@ var ( M3CoordinatorServicePlacementPathName = path.Join(ServicesPathName, handler.M3CoordinatorServiceName, PlacementPathName) errUnableToParseService = errors.New("unable to parse service") - - allowedServices = allowedServicesSet{ - handler.M3DBServiceName: true, - handler.M3AggregatorServiceName: true, - handler.M3CoordinatorServiceName: true, - } ) // HandlerOptions is the options struct for the handler. @@ -148,10 +132,10 @@ func ServiceWithAlgo( return nil, nil, err } - if _, ok := allowedServices[opts.ServiceName]; !ok { + if !handler.IsAllowedService(opts.ServiceName) { return nil, nil, fmt.Errorf( - "invalid service name: %s, must be one of: %s", - opts.ServiceName, allowedServices.String()) + "invalid service name: %s, must be one of: %v", + opts.ServiceName, handler.AllowedServices()) } sid := opts.ServiceID() @@ -438,7 +422,7 @@ func parseServiceFromRequest(r *http.Request) (string, error) { for i, c := range components { if c == "services" && i+1 < len(components) { service := components[i+1] - if _, ok := allowedServices[service]; ok { + if handler.IsAllowedService(service) { return service, nil } return "", fmt.Errorf("unknown service: %s", service) diff --git a/src/query/api/v1/handler/placement/common_test.go b/src/query/api/v1/handler/placement/common_test.go index bceaf5eb1b..317574319d 100644 --- a/src/query/api/v1/handler/placement/common_test.go +++ b/src/query/api/v1/handler/placement/common_test.go @@ -281,7 +281,7 @@ func TestValidateAllAvailable(t *testing.T) { } func runForAllAllowedServices(f func(service string)) { - for service := range allowedServices { + for _, service := range handler.AllowedServices() { f(service) } } From f911a3e680e34795f54716f359a796e78e624363 Mon Sep 17 00:00:00 2001 From: Matt Schallert Date: Wed, 6 Mar 2019 12:22:02 -0500 Subject: [PATCH 3/3] change allowed svcs map type --- src/query/api/v1/handler/allowed_services.go | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/src/query/api/v1/handler/allowed_services.go b/src/query/api/v1/handler/allowed_services.go index 075b05c9f1..604ed1f6db 100644 --- a/src/query/api/v1/handler/allowed_services.go +++ b/src/query/api/v1/handler/allowed_services.go @@ -20,7 +20,7 @@ package handler -type allowedServicesSet map[string]bool +type allowedServicesSet map[string]struct{} func (a allowedServicesSet) String() []string { s := make([]string, 0, len(a)) @@ -32,9 +32,9 @@ func (a allowedServicesSet) String() []string { var ( allowedServices = allowedServicesSet{ - M3DBServiceName: true, - M3AggregatorServiceName: true, - M3CoordinatorServiceName: true, + M3DBServiceName: struct{}{}, + M3AggregatorServiceName: struct{}{}, + M3CoordinatorServiceName: struct{}{}, } ) @@ -47,10 +47,8 @@ func IsAllowedService(svc string) bool { // AllowedServices returns the list of valid M3 services. func AllowedServices() []string { svcs := make([]string, 0, len(allowedServices)) - for svc, allowed := range allowedServices { - if allowed { - svcs = append(svcs, svc) - } + for svc := range allowedServices { + svcs = append(svcs, svc) } return svcs }