Skip to content

Commit

Permalink
[api] support zone/env overrides in ns handler
Browse files Browse the repository at this point in the history
The placement APIs support zone/environment overrides via HTTP headers.
This adds that functionality to the namespace APIs so users can manage
multiple M3DB clusters from a single coordinator.
  • Loading branch information
schallert committed Mar 6, 2019
1 parent d78eee5 commit 8b1fee8
Show file tree
Hide file tree
Showing 21 changed files with 361 additions and 189 deletions.
9 changes: 5 additions & 4 deletions src/query/api/v1/handler/database/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import (
"github.com/m3db/m3/src/query/api/v1/handler/placement"
"github.com/m3db/m3/src/query/generated/proto/admin"
"github.com/m3db/m3/src/query/util/logging"
"github.com/m3db/m3/src/x/net/http"
xhttp "github.com/m3db/m3/src/x/net/http"

"github.com/golang/protobuf/jsonpb"
"go.uber.org/zap"
Expand Down Expand Up @@ -162,7 +162,7 @@ func (h *createHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
logger = logging.WithContext(ctx)
)

currPlacement, _, err := h.placementGetHandler.Get(placement.M3DBServiceName, nil)
currPlacement, _, err := h.placementGetHandler.Get(handler.M3DBServiceName, nil)
if err != nil {
logger.Error("unable to get placement", zap.Error(err))
xhttp.Error(w, err, http.StatusInternalServerError)
Expand Down Expand Up @@ -205,7 +205,8 @@ func (h *createHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}

nsRegistry, err = h.namespaceAddHandler.Add(namespaceRequest)
opts := handler.NewServiceOptions("kv", r.Header, nil)
nsRegistry, err = h.namespaceAddHandler.Add(namespaceRequest, opts)
if err != nil {
logger.Error("unable to add namespace", zap.Error(err))
xhttp.Error(w, err, http.StatusInternalServerError)
Expand Down Expand Up @@ -241,7 +242,7 @@ func (h *createHandler) maybeInitPlacement(
// If we're here then there is no existing placement, so just create it. This is safe because in
// the case where a placement did not already exist, the parse function above validated that we
// have all the required information to create a placement.
newPlacement, err := h.placementInitHandler.Init(placement.M3DBServiceName, r, placementRequest)
newPlacement, err := h.placementInitHandler.Init(handler.M3DBServiceName, r, placementRequest)
if err != nil {
return nil, false, err
}
Expand Down
8 changes: 8 additions & 0 deletions src/query/api/v1/handler/database/create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ func testLocalType(t *testing.T, providedType string, placementExists bool) {
defer ctrl.Finish()

mockClient, mockKV, mockPlacementService := SetupDatabaseTest(t, ctrl)
mockClient.EXPECT().Store(gomock.Any()).Return(mockKV, nil).AnyTimes()
createHandler := NewCreateHandler(mockClient, config.Configuration{}, testDBCfg)
w := httptest.NewRecorder()

Expand Down Expand Up @@ -245,6 +246,7 @@ func TestLocalTypeWithNumShards(t *testing.T) {
defer ctrl.Finish()

mockClient, mockKV, mockPlacementService := SetupDatabaseTest(t, ctrl)
mockClient.EXPECT().Store(gomock.Any()).Return(mockKV, nil).AnyTimes()
createHandler := NewCreateHandler(mockClient, config.Configuration{}, testDBCfg)
w := httptest.NewRecorder()

Expand Down Expand Up @@ -349,6 +351,7 @@ func TestLocalWithBlockSizeNanos(t *testing.T) {
defer ctrl.Finish()

mockClient, mockKV, mockPlacementService := SetupDatabaseTest(t, ctrl)
mockClient.EXPECT().Store(gomock.Any()).Return(mockKV, nil).AnyTimes()
createHandler := NewCreateHandler(mockClient, config.Configuration{}, testDBCfg)
w := httptest.NewRecorder()

Expand Down Expand Up @@ -454,6 +457,7 @@ func TestLocalWithBlockSizeExpectedSeriesDatapointsPerHour(t *testing.T) {
defer ctrl.Finish()

mockClient, mockKV, mockPlacementService := SetupDatabaseTest(t, ctrl)
mockClient.EXPECT().Store(gomock.Any()).Return(mockKV, nil).AnyTimes()
createHandler := NewCreateHandler(mockClient, config.Configuration{}, testDBCfg)
w := httptest.NewRecorder()

Expand Down Expand Up @@ -571,6 +575,7 @@ func TestClusterTypeHostsPlacementAlreadyExistsHostsProvided(t *testing.T) {
defer ctrl.Finish()

mockClient, _, mockPlacementService := SetupDatabaseTest(t, ctrl)
mockClient.EXPECT().Store(gomock.Any()).Return(nil, nil).AnyTimes()
createHandler := NewCreateHandler(mockClient, config.Configuration{}, testDBCfg)
w := httptest.NewRecorder()

Expand Down Expand Up @@ -669,6 +674,7 @@ func testClusterTypeHosts(t *testing.T, placementExists bool) {
defer ctrl.Finish()

mockClient, mockKV, mockPlacementService := SetupDatabaseTest(t, ctrl)
mockClient.EXPECT().Store(gomock.Any()).Return(mockKV, nil).AnyTimes()
createHandler := NewCreateHandler(mockClient, config.Configuration{}, testDBCfg)
w := httptest.NewRecorder()

Expand Down Expand Up @@ -810,6 +816,8 @@ func TestClusterTypeHostsWithIsolationGroup(t *testing.T) {
defer ctrl.Finish()

mockClient, mockKV, mockPlacementService := SetupDatabaseTest(t, ctrl)
mockClient.EXPECT().Store(gomock.Any()).Return(mockKV, nil)

createHandler := NewCreateHandler(mockClient, config.Configuration{}, testDBCfg)
w := httptest.NewRecorder()

Expand Down
14 changes: 14 additions & 0 deletions src/query/api/v1/handler/headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,18 @@ const (

// DeprecatedHeader is the M3 deprecated header
DeprecatedHeader = "M3-Deprecated"

// DefaultServiceEnvironment is the default service ID environment.
DefaultServiceEnvironment = "default_env"
// DefaultServiceZone is the default service ID zone.
DefaultServiceZone = "embedded"

// HeaderClusterEnvironmentName is the header used to specify the environment
// name.
HeaderClusterEnvironmentName = "Cluster-Environment-Name"
// HeaderClusterZoneName is the header used to specify the zone name.
HeaderClusterZoneName = "Cluster-Zone-Name"
// HeaderDryRun is the header used to specify whether this should be a dry
// run.
HeaderDryRun = "Dry-Run"
)
12 changes: 9 additions & 3 deletions src/query/api/v1/handler/namespace/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"path"

clusterclient "github.com/m3db/m3/src/cluster/client"
"github.com/m3db/m3/src/cluster/kv"
nsproto "github.com/m3db/m3/src/dbnode/generated/proto/namespace"
"github.com/m3db/m3/src/dbnode/storage/namespace"
"github.com/m3db/m3/src/query/api/v1/handler"
Expand Down Expand Up @@ -72,7 +73,8 @@ func (h *AddHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}

nsRegistry, err := h.Add(md)
opts := handler.NewServiceOptions("kv", r.Header, nil)
nsRegistry, err := h.Add(md, opts)
if err != nil {
if err == errNamespaceExists {
logger.Error("namespace already exists", zap.Error(err))
Expand Down Expand Up @@ -108,15 +110,19 @@ func (h *AddHandler) parseRequest(r *http.Request) (*admin.NamespaceAddRequest,
}

// Add adds a namespace.
func (h *AddHandler) Add(addReq *admin.NamespaceAddRequest) (nsproto.Registry, error) {
func (h *AddHandler) Add(addReq *admin.NamespaceAddRequest, opts handler.ServiceOptions) (nsproto.Registry, error) {
var emptyReg = nsproto.Registry{}

md, err := namespace.ToMetadata(addReq.Name, addReq.Options)
if err != nil {
return emptyReg, fmt.Errorf("unable to get metadata: %v", err)
}

store, err := h.client.KV()
kvOpts := kv.NewOverrideOptions().
SetEnvironment(opts.ServiceEnvironment).
SetZone(opts.ServiceZone)

store, err := h.client.Store(kvOpts)
if err != nil {
return emptyReg, err
}
Expand Down
2 changes: 2 additions & 0 deletions src/query/api/v1/handler/namespace/add_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ const testAddJSON = `
func TestNamespaceAddHandler(t *testing.T) {
mockClient, mockKV, _ := SetupNamespaceTest(t)
addHandler := NewAddHandler(mockClient)
mockClient.EXPECT().Store(gomock.Any()).Return(mockKV, nil)

// Error case where required fields are not set
w := httptest.NewRecorder()
Expand Down Expand Up @@ -108,6 +109,7 @@ func TestNamespaceAddHandler(t *testing.T) {
func TestNamespaceAddHandler_Conflict(t *testing.T) {
mockClient, mockKV, ctrl := SetupNamespaceTest(t)
addHandler := NewAddHandler(mockClient)
mockClient.EXPECT().Store(gomock.Any()).Return(mockKV, nil)

// Ensure adding an existing namespace returns 409
req := httptest.NewRequest("POST", "/namespace", strings.NewReader(testAddJSON))
Expand Down
135 changes: 135 additions & 0 deletions src/query/api/v1/handler/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
// Copyright (c) 2019 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package handler

import (
"errors"
"net/http"
"strings"
"time"

"github.com/m3db/m3/src/cluster/services"
)

const (
// M3DBServiceName is the service name for M3DB.
M3DBServiceName = "m3db"
// M3AggregatorServiceName is the service name for M3Aggregator.
M3AggregatorServiceName = "m3aggregator"
// M3CoordinatorServiceName is the service name for M3Coordinator.
M3CoordinatorServiceName = "m3coordinator"

defaultM3AggMaxAggregationWindowSize = time.Minute
// defaultM3AggWarmupDuration configures the buffer to account for the delay
// of propagating aggregator placement to clients, usually needed when there is
// a large amount of clients sending traffic to m3aggregator.
defaultM3AggWarmupDuration = 0
)

var (
errServiceNameIsRequired = errors.New("service name is required")
errServiceEnvironmentIsRequired = errors.New("service environment is required")
errServiceZoneIsRequired = errors.New("service zone is required")
errM3AggServiceOptionsRequired = errors.New("m3agg service options are required")
)

// ServiceOptions are the options for Service.
type ServiceOptions struct {
ServiceName string
ServiceEnvironment string
ServiceZone string

M3Agg *M3AggServiceOptions

DryRun bool
}

// M3AggServiceOptions contains the service options that are
// specific to the M3Agg service.
type M3AggServiceOptions struct {
MaxAggregationWindowSize time.Duration
WarmupDuration time.Duration
}

// NewServiceOptions returns a ServiceOptions based on the provided
// values.
func NewServiceOptions(
serviceName string, headers http.Header, m3AggOpts *M3AggServiceOptions) ServiceOptions {
opts := ServiceOptions{
ServiceName: serviceName,
ServiceEnvironment: DefaultServiceEnvironment,
ServiceZone: DefaultServiceZone,

DryRun: false,

M3Agg: &M3AggServiceOptions{
MaxAggregationWindowSize: defaultM3AggMaxAggregationWindowSize,
WarmupDuration: defaultM3AggWarmupDuration,
},
}

if v := strings.TrimSpace(headers.Get(HeaderClusterEnvironmentName)); v != "" {
opts.ServiceEnvironment = v
}
if v := strings.TrimSpace(headers.Get(HeaderClusterZoneName)); v != "" {
opts.ServiceZone = v
}
if v := strings.TrimSpace(headers.Get(HeaderDryRun)); v == "true" {
opts.DryRun = true
}

if m3AggOpts != nil {
if m3AggOpts.MaxAggregationWindowSize > 0 {
opts.M3Agg.MaxAggregationWindowSize = m3AggOpts.MaxAggregationWindowSize
}

if m3AggOpts.WarmupDuration > 0 {
opts.M3Agg.WarmupDuration = m3AggOpts.WarmupDuration
}
}

return opts
}

// Validate ensures the service options are valid.
func (opts *ServiceOptions) Validate() error {
if opts.ServiceName == "" {
return errServiceNameIsRequired
}
if opts.ServiceEnvironment == "" {
return errServiceEnvironmentIsRequired
}
if opts.ServiceZone == "" {
return errServiceZoneIsRequired
}
if opts.ServiceName == M3AggregatorServiceName && opts.M3Agg == nil {
return errM3AggServiceOptionsRequired
}
return nil
}

// ServiceID constructs a cluster services ID from the options.
func (opts *ServiceOptions) ServiceID() services.ServiceID {
return services.NewServiceID().
SetName(opts.ServiceName).
SetEnvironment(opts.ServiceEnvironment).
SetZone(opts.ServiceZone)
}
Loading

0 comments on commit 8b1fee8

Please sign in to comment.