diff --git a/glide.lock b/glide.lock index a9005b59af..fab412fd3c 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ hash: 6f999e6981ec189be9b4b4f8c7f4860c3ab304c39ec75bf8e7531094a32cb259 -updated: 2019-10-03T19:08:41.08838-04:00 +updated: 2019-10-04T13:32:26.527615-04:00 imports: - name: github.com/alecthomas/units version: f65c72e2690dc4b403c8bd637baf4611cd4c069b @@ -65,6 +65,7 @@ imports: - pkg/adt - pkg/contention - pkg/cors + - pkg/cpuutil - pkg/crc - pkg/debugutil - pkg/fileutil @@ -111,7 +112,7 @@ imports: subpackages: - capnslog - name: github.com/couchbase/vellum - version: ef2e028c01fdb60c46da4067d2e83745b8d54120 + version: 41f2deade2cfab59facd263e918d7c05f656c2e9 subpackages: - utf8 - name: github.com/davecgh/go-spew @@ -129,7 +130,7 @@ imports: - name: github.com/ghodss/yaml version: 0ca9ea5df5451ffdf184b4428c902747c2c11cd7 - name: github.com/go-kit/kit - version: 0fadbe99ddb1ee87bcf5ea2ca9b84fcfe585c68b + version: dc489b75b9cdbf29c739534c2aa777cabb034954 subpackages: - log - log/level @@ -198,7 +199,7 @@ imports: - runtime/internal - utilities - name: github.com/hashicorp/hcl - version: 5ef25cc8e679070cb61d5d33813645ad59a9d50e + version: cf7d376da96d9cecec7c7483cec2735efe54a410 subpackages: - hcl/ast - hcl/parser @@ -224,6 +225,8 @@ imports: - internal - name: github.com/jonboulle/clockwork version: 2eee05ed794112d45db504eb05aa693efd2b8b09 +- name: github.com/kr/logfmt + version: b84e30acd515aadc4b783ad4ff83aff3299bdfe0 - name: github.com/leanovate/gopter version: e2604588f4db2d2e5eb78ae75d615516f55873e3 subpackages: @@ -278,7 +281,7 @@ imports: - name: github.com/mitchellh/mapstructure version: 3536a929edddb9a5b34bd6861dc4a9647cb459fe - name: github.com/oklog/ulid - version: a41d229797d91e914323f27d52d1053d2e709119 + version: e51a56f2a4c1bf73c967ca6d45d5366bade31943 - name: github.com/opentracing-contrib/go-stdlib version: cf7a6c988dc994e945d2715565026f3cc8718689 subpackages: @@ -296,7 +299,7 @@ imports: - name: github.com/pelletier/go-toml version: 8fe62057ea2d46ce44254c98e84e810044dbe197 - name: github.com/pilosa/pilosa - version: 74b1bb853e0189f7ffd9c8ef6f36847889230ee7 + version: bc9747cc0f19702d9753de7ea9375d8311dfc706 subpackages: - logger - stats @@ -485,7 +488,7 @@ imports: - lex/httplex - trace - name: golang.org/x/sync - version: cd5d95a43a6e21273425c7ae415d3df9ea832eeb + version: 112230192c580c3556b8cee6403af37a4fc5f28c subpackages: - errgroup - name: golang.org/x/sys @@ -569,7 +572,9 @@ imports: vcs: git testImports: - name: github.com/glycerine/go-unsnap-stream - version: 81cf024a9e0a906651886e23cb984a852a30b622 + version: 98d31706395aaac22e29676617f2ee37bee55b5a +- name: github.com/mschoch/smat + version: 90eadee771aeab36e8bf796039b8c261bebebe4f - name: github.com/philhofer/fwd version: bb6d471dc95d4fe11e432687f8b70ff496cf3136 - name: github.com/tinylib/msgp diff --git a/scripts/docker-integration-tests/common.sh b/scripts/docker-integration-tests/common.sh index dfccdf1d01..f0e3f9ec75 100644 --- a/scripts/docker-integration-tests/common.sh +++ b/scripts/docker-integration-tests/common.sh @@ -47,7 +47,9 @@ function setup_single_m3db_node { local dbnode_host=${DBNODE_HOST:-dbnode01} local dbnode_port=${DBNODE_PORT:-9000} local dbnode_health_port=${DBNODE_HEALTH_PORT:-9002} + local dbnode_id=${DBNODE_ID:-m3db_local} local coordinator_port=${COORDINATOR_PORT:-7201} + local zone=${ZONE:-embedded} echo "Wait for API to be available" ATTEMPTS=100 MAX_TIMEOUT=4 TIMEOUT=1 retry_with_backoff \ @@ -62,9 +64,9 @@ function setup_single_m3db_node { "replicationFactor": 1, "hosts": [ { - "id": "m3db_local", + "id": "'${dbnode_id}'", "isolation_group": "rack-a", - "zone": "embedded", + "zone": "'${zone}'", "weight": 1024, "address": "'"${dbnode_host}"'", "port": '"${dbnode_port}"' @@ -74,7 +76,7 @@ function setup_single_m3db_node { echo "Wait until placement is init'd" ATTEMPTS=10 MAX_TIMEOUT=4 TIMEOUT=1 retry_with_backoff \ - '[ "$(curl -sSf 0.0.0.0:'"${coordinator_port}"'/api/v1/placement | jq .placement.instances.m3db_local.id)" == \"m3db_local\" ]' + '[ "$(curl -sSf 0.0.0.0:'"${coordinator_port}"'/api/v1/placement | jq .placement.instances.'${dbnode_id}'.id)" == \"'${dbnode_id}'\" ]' wait_for_namespaces diff --git a/scripts/docker-integration-tests/dedicated_etcd_embedded_coordinator/docker-compose.yml b/scripts/docker-integration-tests/dedicated_etcd_embedded_coordinator/docker-compose.yml new file mode 100644 index 0000000000..4d920b54f3 --- /dev/null +++ b/scripts/docker-integration-tests/dedicated_etcd_embedded_coordinator/docker-compose.yml @@ -0,0 +1,46 @@ +version: "3.5" +services: + dbnode01: + expose: + - "9000-9004" + - "7201" + ports: + - "0.0.0.0:9000-9004:9000-9004" + - "0.0.0.0:7201:7201" + networks: + - backend + image: "m3dbnode_integration:${REVISION}" + environment: + - M3DB_HOST_ID=dbnode01 + volumes: + - "./m3dbnode.yml:/etc/m3dbnode/m3dbnode.yml" + etcd01: + expose: + - "2379-2380" + ports: + - "0.0.0.0:2379-2380:2379-2380" + networks: + - backend + image: quay.io/coreos/etcd:v3.3.10 + command: + - "etcd" + - "--name" + - "etcd01" + - "--listen-peer-urls" + - "http://0.0.0.0:2380" + - "--listen-client-urls" + - "http://0.0.0.0:2379" + - "--advertise-client-urls" + - "http://etcd01:2379" + - "--initial-cluster-token" + - "etcd-cluster-1" + - "--initial-advertise-peer-urls" + - "http://etcd01:2380" + - "--initial-cluster" + - "etcd01=http://etcd01:2380" + - "--initial-cluster-state" + - "new" + - "--data-dir" + - "/var/lib/etcd" +networks: + backend: diff --git a/scripts/docker-integration-tests/dedicated_etcd_embedded_coordinator/m3dbnode.yml b/scripts/docker-integration-tests/dedicated_etcd_embedded_coordinator/m3dbnode.yml new file mode 100644 index 0000000000..53431cc430 --- /dev/null +++ b/scripts/docker-integration-tests/dedicated_etcd_embedded_coordinator/m3dbnode.yml @@ -0,0 +1,85 @@ +coordinator: + listenAddress: + type: "config" + value: "0.0.0.0:7201" + + logging: + level: info + + metrics: + scope: + prefix: "coordinator" + prometheus: + handlerPath: /metrics + listenAddress: 0.0.0.0:7203 # until https://github.com/m3db/m3/issues/682 is resolved + sanitization: prometheus + samplingRate: 1.0 + extended: none + + tagOptions: + idScheme: quoted + +db: + logging: + level: info + + metrics: + prometheus: + handlerPath: /metrics + sanitization: prometheus + samplingRate: 1.0 + extended: detailed + + listenAddress: 0.0.0.0:9000 + clusterListenAddress: 0.0.0.0:9001 + httpNodeListenAddress: 0.0.0.0:9002 + httpClusterListenAddress: 0.0.0.0:9003 + debugListenAddress: 0.0.0.0:9004 + + hostID: + resolver: environment + envVarName: M3DB_HOST_ID + + client: + writeConsistencyLevel: majority + readConsistencyLevel: unstrict_majority + + gcPercentage: 100 + + writeNewSeriesAsync: true + writeNewSeriesLimitPerSecond: 1048576 + writeNewSeriesBackoffDuration: 2ms + + bootstrap: + # Intentionally disable peers bootstrapper to ensure it doesn't interfere with test. + bootstrappers: + - filesystem + - commitlog + - uninitialized_topology + commitlog: + returnUnfulfilledForCorruptCommitLogFiles: false + + cache: + series: + policy: lru + + commitlog: + flushMaxBytes: 524288 + flushEvery: 1s + queue: + calculationType: fixed + size: 2097152 + + fs: + filePathPrefix: /var/lib/m3db + + config: + service: + env: foo-namespace/foo-cluster + zone: bar-zone + service: m3db + cacheDir: /var/lib/m3kv + etcdClusters: + - zone: bar-zone + endpoints: + - etcd01:2379 diff --git a/scripts/docker-integration-tests/dedicated_etcd_embedded_coordinator/test.sh b/scripts/docker-integration-tests/dedicated_etcd_embedded_coordinator/test.sh new file mode 100755 index 0000000000..e926048216 --- /dev/null +++ b/scripts/docker-integration-tests/dedicated_etcd_embedded_coordinator/test.sh @@ -0,0 +1,43 @@ +#!/usr/bin/env bash + +set -xe + +source $GOPATH/src/github.com/m3db/m3/scripts/docker-integration-tests/common.sh +REVISION=$(git rev-parse HEAD) +SCRIPT_PATH=$GOPATH/src/github.com/m3db/m3/scripts/docker-integration-tests/dedicated_etcd_embedded_coordinator +COMPOSE_FILE=$SCRIPT_PATH/docker-compose.yml +export REVISION + +echo "Run etcd and m3dbnode containers" +docker-compose -f ${COMPOSE_FILE} up -d --renew-anon-volumes etcd01 +docker-compose -f ${COMPOSE_FILE} up -d --renew-anon-volumes dbnode01 + +DUMP_DIR="${SCRIPT_PATH}/dump" +DUMP_ZIP="${DUMP_DIR}/dump.zip" + +function defer { + if [ -d $DUMP_DIR ]; then + rm -rf $DUMP_DIR + fi + docker-compose -f ${COMPOSE_FILE} down || echo "unable to shutdown containers" # CI fails to stop all containers sometimes +} +trap defer EXIT + +# Should be able to setup single db node with custom environment and zone +# using the embedded coordinator without special headers +DBNODE_ID="dbnode01" ZONE="bar-zone" setup_single_m3db_node + +echo "Test the debug dump endpoint works with custom env and zone" +mkdir -p $DUMP_DIR +curl -s http://localhost:9004/debug/dump > $DUMP_ZIP + +unzip -d $DUMP_DIR $DUMP_ZIP + +EXPECTED_FILES="cpu.prof heap.prof goroutine.prof host.json namespace.json placement-m3db.json" +for file in $(echo "${EXPECTED_FILES}" | tr " " "\n"); do + if ! [ -f "${DUMP_DIR}/${file}" ]; then + echo "Expected ${file} but not in dump:" + echo $(ls $DUMP_DIR) + exit 1 + fi +done diff --git a/scripts/docker-integration-tests/simple_v2_batch_apis/test.sh b/scripts/docker-integration-tests/simple_v2_batch_apis/test.sh index 093608123a..4e405fd617 100755 --- a/scripts/docker-integration-tests/simple_v2_batch_apis/test.sh +++ b/scripts/docker-integration-tests/simple_v2_batch_apis/test.sh @@ -63,7 +63,7 @@ function prometheus_remote_write { network=$(docker network ls --format '{{.ID}}' | tail -n 1) out=$((docker run -it --rm --network $network \ $PROMREMOTECLI_IMAGE \ - -u http://coordinator01:7201/api/v1/prom/remote/write \ + -u http://dbnode01:7201/api/v1/prom/remote/write \ -t __name__:${metric_name} \ -h "M3-Metrics-Type: ${metrics_type}" \ -h "M3-Storage-Policy: ${metrics_storage_policy}" \ diff --git a/src/cluster/client/etcd/client.go b/src/cluster/client/etcd/client.go index d48d9ef0ab..b1c7e94f94 100644 --- a/src/cluster/client/etcd/client.go +++ b/src/cluster/client/etcd/client.go @@ -260,7 +260,7 @@ func (c *csclient) etcdClientGen(zone string) (*clientv3.Client, error) { cluster, ok := c.opts.ClusterForZone(zone) if !ok { - return nil, fmt.Errorf("no etcd cluster found for zone %s", zone) + return nil, fmt.Errorf("no etcd cluster found for zone: %s", zone) } err := c.retrier.Attempt(func() error { diff --git a/src/dbnode/server/server.go b/src/dbnode/server/server.go index 9e4d505e55..5b9b5ca068 100644 --- a/src/dbnode/server/server.go +++ b/src/dbnode/server/server.go @@ -39,6 +39,7 @@ import ( "github.com/m3db/m3/src/cluster/kv" "github.com/m3db/m3/src/cluster/kv/util" "github.com/m3db/m3/src/cmd/services/m3dbnode/config" + queryconfig "github.com/m3db/m3/src/cmd/services/m3query/config" "github.com/m3db/m3/src/dbnode/client" "github.com/m3db/m3/src/dbnode/encoding" "github.com/m3db/m3/src/dbnode/encoding/m3tsz" @@ -68,6 +69,8 @@ import ( "github.com/m3db/m3/src/dbnode/x/xio" "github.com/m3db/m3/src/m3ninx/postings" "github.com/m3db/m3/src/m3ninx/postings/roaring" + "github.com/m3db/m3/src/query/api/v1/handler" + "github.com/m3db/m3/src/query/api/v1/handler/placement" xconfig "github.com/m3db/m3/src/x/config" "github.com/m3db/m3/src/x/context" xdebug "github.com/m3db/m3/src/x/debug" @@ -316,14 +319,6 @@ func Run(runOpts RunOptions) { opentracing.SetGlobalTracer(tracer) - debugWriter, err := xdebug.NewZipWriterWithDefaultSources( - cpuProfileDuration, - iopts, - ) - if err != nil { - logger.Error("unable to create debug writer", zap.Error(err)) - } - if cfg.Index.MaxQueryIDsConcurrency != 0 { queryIDsWorkerPool := xsync.NewWorkerPool(cfg.Index.MaxQueryIDsConcurrency) queryIDsWorkerPool.Init() @@ -603,6 +598,38 @@ func Run(runOpts RunOptions) { logger.Info("node httpjson: listening", zap.String("address", cfg.HTTPNodeListenAddress)) if cfg.DebugListenAddress != "" { + var debugWriter xdebug.ZipWriter + handlerOpts, err := placement.NewHandlerOptions(syncCfg.ClusterClient, + queryconfig.Configuration{}, nil, iopts) + if err != nil { + logger.Warn("could not create handler options for debug writer", zap.Error(err)) + } else { + envCfg, err := cfg.EnvironmentConfig.Services.SyncCluster() + if err != nil || envCfg.Service == nil { + logger.Warn("could not get cluster config for debug writer", + zap.Error(err), + zap.Bool("envCfgServiceIsNil", envCfg.Service == nil)) + } else { + debugWriter, err = xdebug.NewPlacementAndNamespaceZipWriterWithDefaultSources( + cpuProfileDuration, + syncCfg.ClusterClient, + handlerOpts, + []handler.ServiceNameAndDefaults{ + { + ServiceName: handler.M3DBServiceName, + Defaults: []handler.ServiceOptionsDefault{ + handler.WithDefaultServiceEnvironment(envCfg.Service.Env), + handler.WithDefaultServiceZone(envCfg.Service.Zone), + }, + }, + }, + iopts) + if err != nil { + logger.Error("unable to create debug writer", zap.Error(err)) + } + } + } + go func() { mux := http.DefaultServeMux if debugWriter != nil { diff --git a/src/query/api/v1/handler/database/common.go b/src/query/api/v1/handler/database/common.go index b76202d3cd..6484d20ac5 100644 --- a/src/query/api/v1/handler/database/common.go +++ b/src/query/api/v1/handler/database/common.go @@ -26,6 +26,7 @@ import ( clusterclient "github.com/m3db/m3/src/cluster/client" dbconfig "github.com/m3db/m3/src/cmd/services/m3dbnode/config" "github.com/m3db/m3/src/cmd/services/m3query/config" + "github.com/m3db/m3/src/query/api/v1/handler" "github.com/m3db/m3/src/query/util/logging" "github.com/m3db/m3/src/x/instrument" @@ -46,13 +47,15 @@ func RegisterRoutes( client clusterclient.Client, cfg config.Configuration, embeddedDbCfg *dbconfig.DBConfiguration, + defaults []handler.ServiceOptionsDefault, instrumentOpts instrument.Options, ) error { wrapped := func(n http.Handler) http.Handler { return logging.WithResponseTimeAndPanicErrorLogging(n, instrumentOpts) } - createHandler, err := NewCreateHandler(client, cfg, embeddedDbCfg, instrumentOpts) + createHandler, err := NewCreateHandler(client, cfg, embeddedDbCfg, + defaults, instrumentOpts) if err != nil { return err } diff --git a/src/query/api/v1/handler/database/create.go b/src/query/api/v1/handler/database/create.go index 7ade3402ed..2d1965e1e3 100644 --- a/src/query/api/v1/handler/database/create.go +++ b/src/query/api/v1/handler/database/create.go @@ -135,6 +135,7 @@ type createHandler struct { namespaceGetHandler *namespace.GetHandler namespaceDeleteHandler *namespace.DeleteHandler embeddedDbCfg *dbconfig.DBConfiguration + defaults []handler.ServiceOptionsDefault instrumentOpts instrument.Options } @@ -143,6 +144,7 @@ func NewCreateHandler( client clusterclient.Client, cfg config.Configuration, embeddedDbCfg *dbconfig.DBConfiguration, + defaults []handler.ServiceOptionsDefault, instrumentOpts instrument.Options, ) (http.Handler, error) { placementHandlerOptions, err := placement.NewHandlerOptions(client, @@ -157,17 +159,25 @@ func NewCreateHandler( namespaceGetHandler: namespace.NewGetHandler(client, instrumentOpts), namespaceDeleteHandler: namespace.NewDeleteHandler(client, instrumentOpts), embeddedDbCfg: embeddedDbCfg, + defaults: defaults, instrumentOpts: instrumentOpts, }, nil } +func (h *createHandler) serviceNameAndDefaults() handler.ServiceNameAndDefaults { + return handler.ServiceNameAndDefaults{ + ServiceName: handler.M3DBServiceName, + Defaults: h.defaults, + } +} + func (h *createHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { var ( ctx = r.Context() logger = logging.WithContext(ctx, h.instrumentOpts) ) - - currPlacement, _, err := h.placementGetHandler.Get(handler.M3DBServiceName, nil) + currPlacement, _, err := h.placementGetHandler.Get( + h.serviceNameAndDefaults(), nil) if err != nil { logger.Error("unable to get placement", zap.Error(err)) xhttp.Error(w, err, http.StatusInternalServerError) @@ -210,7 +220,8 @@ func (h *createHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - opts := handler.NewServiceOptions(handler.M3DBServiceName, r.Header, nil) + opts := handler.NewServiceOptions(h.serviceNameAndDefaults(), + r.Header, nil) nsRegistry, err = h.namespaceAddHandler.Add(namespaceRequest, opts) if err != nil { logger.Error("unable to add namespace", zap.Error(err)) @@ -247,7 +258,8 @@ func (h *createHandler) maybeInitPlacement( // If we're here then there is no existing placement, so just create it. This is safe because in // the case where a placement did not already exist, the parse function above validated that we // have all the required information to create a placement. - newPlacement, err := h.placementInitHandler.Init(handler.M3DBServiceName, r, placementRequest) + newPlacement, err := h.placementInitHandler.Init(h.serviceNameAndDefaults(), + r, placementRequest) if err != nil { return nil, false, err } diff --git a/src/query/api/v1/handler/database/create_test.go b/src/query/api/v1/handler/database/create_test.go index b20c5c0403..03c2d8b4b9 100644 --- a/src/query/api/v1/handler/database/create_test.go +++ b/src/query/api/v1/handler/database/create_test.go @@ -38,6 +38,7 @@ import ( "github.com/m3db/m3/src/cluster/services" dbconfig "github.com/m3db/m3/src/cmd/services/m3dbnode/config" "github.com/m3db/m3/src/cmd/services/m3query/config" + "github.com/m3db/m3/src/query/api/v1/handler" "github.com/m3db/m3/src/query/api/v1/handler/namespace" "github.com/m3db/m3/src/x/instrument" xtest "github.com/m3db/m3/src/x/test" @@ -51,6 +52,11 @@ var ( testDBCfg = &dbconfig.DBConfiguration{ ListenAddress: "0.0.0.0:9000", } + svcDefaultOptions = []handler.ServiceOptionsDefault{ + func(o handler.ServiceOptions) handler.ServiceOptions { + return o + }, + } ) func SetupDatabaseTest( @@ -92,7 +98,7 @@ func testLocalType(t *testing.T, providedType string, placementExists bool) { mockClient, mockKV, mockPlacementService := SetupDatabaseTest(t, ctrl) mockClient.EXPECT().Store(gomock.Any()).Return(mockKV, nil).AnyTimes() createHandler, err := NewCreateHandler(mockClient, config.Configuration{}, - testDBCfg, instrument.NewOptions()) + testDBCfg, svcDefaultOptions, instrument.NewOptions()) require.NoError(t, err) w := httptest.NewRecorder() @@ -206,7 +212,7 @@ func TestLocalTypeClusteredPlacementAlreadyExists(t *testing.T) { mockClient, _, mockPlacementService := SetupDatabaseTest(t, ctrl) createHandler, err := NewCreateHandler(mockClient, config.Configuration{}, - testDBCfg, instrument.NewOptions()) + testDBCfg, svcDefaultOptions, instrument.NewOptions()) require.NoError(t, err) w := httptest.NewRecorder() @@ -253,7 +259,7 @@ func TestLocalTypeWithNumShards(t *testing.T) { mockClient, mockKV, mockPlacementService := SetupDatabaseTest(t, ctrl) mockClient.EXPECT().Store(gomock.Any()).Return(mockKV, nil).AnyTimes() createHandler, err := NewCreateHandler(mockClient, config.Configuration{}, - testDBCfg, instrument.NewOptions()) + testDBCfg, svcDefaultOptions, instrument.NewOptions()) require.NoError(t, err) w := httptest.NewRecorder() @@ -364,7 +370,7 @@ func TestLocalWithBlockSizeNanos(t *testing.T) { mockClient, mockKV, mockPlacementService := SetupDatabaseTest(t, ctrl) mockClient.EXPECT().Store(gomock.Any()).Return(mockKV, nil).AnyTimes() createHandler, err := NewCreateHandler(mockClient, config.Configuration{}, - testDBCfg, instrument.NewOptions()) + testDBCfg, svcDefaultOptions, instrument.NewOptions()) require.NoError(t, err) w := httptest.NewRecorder() @@ -475,7 +481,7 @@ func TestLocalWithBlockSizeExpectedSeriesDatapointsPerHour(t *testing.T) { mockClient, mockKV, mockPlacementService := SetupDatabaseTest(t, ctrl) mockClient.EXPECT().Store(gomock.Any()).Return(mockKV, nil).AnyTimes() createHandler, err := NewCreateHandler(mockClient, config.Configuration{}, - testDBCfg, instrument.NewOptions()) + testDBCfg, svcDefaultOptions, instrument.NewOptions()) require.NoError(t, err) w := httptest.NewRecorder() @@ -598,7 +604,7 @@ func TestClusterTypeHostsPlacementAlreadyExistsHostsProvided(t *testing.T) { mockClient, _, mockPlacementService := SetupDatabaseTest(t, ctrl) mockClient.EXPECT().Store(gomock.Any()).Return(nil, nil).AnyTimes() createHandler, err := NewCreateHandler(mockClient, config.Configuration{}, - testDBCfg, instrument.NewOptions()) + testDBCfg, svcDefaultOptions, instrument.NewOptions()) require.NoError(t, err) w := httptest.NewRecorder() @@ -654,7 +660,7 @@ func TestClusterTypeHostsPlacementAlreadyExistsExistingIsLocal(t *testing.T) { mockClient, _, mockPlacementService := SetupDatabaseTest(t, ctrl) createHandler, err := NewCreateHandler(mockClient, config.Configuration{}, - testDBCfg, instrument.NewOptions()) + testDBCfg, svcDefaultOptions, instrument.NewOptions()) require.NoError(t, err) w := httptest.NewRecorder() @@ -701,7 +707,7 @@ func testClusterTypeHosts(t *testing.T, placementExists bool) { mockClient, mockKV, mockPlacementService := SetupDatabaseTest(t, ctrl) mockClient.EXPECT().Store(gomock.Any()).Return(mockKV, nil).AnyTimes() createHandler, err := NewCreateHandler(mockClient, config.Configuration{}, - testDBCfg, instrument.NewOptions()) + testDBCfg, svcDefaultOptions, instrument.NewOptions()) require.NoError(t, err) w := httptest.NewRecorder() @@ -849,7 +855,7 @@ func TestClusterTypeHostsWithIsolationGroup(t *testing.T) { mockClient.EXPECT().Store(gomock.Any()).Return(mockKV, nil) createHandler, err := NewCreateHandler(mockClient, config.Configuration{}, - testDBCfg, instrument.NewOptions()) + testDBCfg, svcDefaultOptions, instrument.NewOptions()) require.NoError(t, err) w := httptest.NewRecorder() @@ -980,7 +986,7 @@ func TestClusterTypeMissingHostnames(t *testing.T) { mockPlacementService.EXPECT().Placement().Return(nil, kv.ErrNotFound) createHandler, err := NewCreateHandler(mockClient, config.Configuration{}, - testDBCfg, instrument.NewOptions()) + testDBCfg, svcDefaultOptions, instrument.NewOptions()) require.NoError(t, err) w := httptest.NewRecorder() @@ -1011,7 +1017,7 @@ func TestBadType(t *testing.T) { mockPlacementService.EXPECT().Placement().Return(nil, kv.ErrNotFound) createHandler, err := NewCreateHandler(mockClient, config.Configuration{}, - nil, instrument.NewOptions()) + nil, svcDefaultOptions, instrument.NewOptions()) require.NoError(t, err) w := httptest.NewRecorder() diff --git a/src/query/api/v1/handler/namespace/add.go b/src/query/api/v1/handler/namespace/add.go index fd907d8b2e..af801a928b 100644 --- a/src/query/api/v1/handler/namespace/add.go +++ b/src/query/api/v1/handler/namespace/add.go @@ -69,7 +69,11 @@ func NewAddHandler( } } -func (h *AddHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { +func (h *AddHandler) ServeHTTP( + svc handler.ServiceNameAndDefaults, + w http.ResponseWriter, + r *http.Request, +) { ctx := r.Context() logger := logging.WithContext(ctx, h.instrumentOpts) @@ -80,7 +84,7 @@ func (h *AddHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - opts := handler.NewServiceOptions("kv", r.Header, nil) + opts := handler.NewServiceOptions(svc, r.Header, nil) nsRegistry, err := h.Add(md, opts) if err != nil { if err == errNamespaceExists { @@ -117,7 +121,10 @@ func (h *AddHandler) parseRequest(r *http.Request) (*admin.NamespaceAddRequest, } // Add adds a namespace. -func (h *AddHandler) Add(addReq *admin.NamespaceAddRequest, opts handler.ServiceOptions) (nsproto.Registry, error) { +func (h *AddHandler) Add( + addReq *admin.NamespaceAddRequest, + opts handler.ServiceOptions, +) (nsproto.Registry, error) { var emptyReg = nsproto.Registry{} md, err := namespace.ToMetadata(addReq.Name, addReq.Options) diff --git a/src/query/api/v1/handler/namespace/add_test.go b/src/query/api/v1/handler/namespace/add_test.go index 03f93d0f92..c361ec49a8 100644 --- a/src/query/api/v1/handler/namespace/add_test.go +++ b/src/query/api/v1/handler/namespace/add_test.go @@ -83,7 +83,7 @@ func TestNamespaceAddHandler(t *testing.T) { req := httptest.NewRequest("POST", "/namespace", strings.NewReader(jsonInput)) require.NotNil(t, req) - addHandler.ServeHTTP(w, req) + addHandler.ServeHTTP(svcDefaults, w, req) resp := w.Result() body, err := ioutil.ReadAll(resp.Body) @@ -100,7 +100,7 @@ func TestNamespaceAddHandler(t *testing.T) { mockKV.EXPECT().Get(M3DBNodeNamespacesKey).Return(nil, kv.ErrNotFound) mockKV.EXPECT().CheckAndSet(M3DBNodeNamespacesKey, gomock.Any(), gomock.Not(nil)).Return(1, nil) - addHandler.ServeHTTP(w, req) + addHandler.ServeHTTP(svcDefaults, w, req) resp = w.Result() body, _ = ioutil.ReadAll(resp.Body) @@ -147,7 +147,7 @@ func TestNamespaceAddHandler_Conflict(t *testing.T) { mockKV.EXPECT().Get(M3DBNodeNamespacesKey).Return(mockValue, nil) w := httptest.NewRecorder() - addHandler.ServeHTTP(w, req) + addHandler.ServeHTTP(svcDefaults, w, req) resp := w.Result() assert.Equal(t, http.StatusConflict, resp.StatusCode) } diff --git a/src/query/api/v1/handler/namespace/common.go b/src/query/api/v1/handler/namespace/common.go index 3023da1e11..2f66738fd7 100644 --- a/src/query/api/v1/handler/namespace/common.go +++ b/src/query/api/v1/handler/namespace/common.go @@ -29,6 +29,7 @@ import ( "github.com/m3db/m3/src/cluster/kv" nsproto "github.com/m3db/m3/src/dbnode/generated/proto/namespace" "github.com/m3db/m3/src/dbnode/namespace" + "github.com/m3db/m3/src/query/api/v1/handler" "github.com/m3db/m3/src/query/util/logging" "github.com/m3db/m3/src/x/instrument" @@ -94,36 +95,52 @@ func Metadata(store kv.Store) ([]namespace.Metadata, int, error) { return nsMap.Metadatas(), value.Version(), nil } -// RegisterRoutes registers the namespace routes +// RegisterRoutes registers the namespace routes. func RegisterRoutes( r *mux.Router, client clusterclient.Client, + defaults []handler.ServiceOptionsDefault, instrumentOpts instrument.Options, ) { wrapped := func(n http.Handler) http.Handler { return logging.WithResponseTimeAndPanicErrorLogging(n, instrumentOpts) } + applyMiddleware := func( + f func(svc handler.ServiceNameAndDefaults, w http.ResponseWriter, r *http.Request), + defaults []handler.ServiceOptionsDefault, + ) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + svc := handler.ServiceNameAndDefaults{ + ServiceName: handler.M3DBServiceName, + Defaults: defaults, + } + f(svc, w, r) + }) + } // Get M3DB namespaces. - getHandler := wrapped(NewGetHandler(client, instrumentOpts)).ServeHTTP - r.HandleFunc(DeprecatedM3DBGetURL, getHandler).Methods(GetHTTPMethod) - r.HandleFunc(M3DBGetURL, getHandler).Methods(GetHTTPMethod) + getHandler := wrapped(NewGetHandler(client, instrumentOpts)) + r.HandleFunc(DeprecatedM3DBGetURL, getHandler.ServeHTTP).Methods(GetHTTPMethod) + r.HandleFunc(M3DBGetURL, getHandler.ServeHTTP).Methods(GetHTTPMethod) - // Add M3DB mamespaces - addHandler := wrapped(NewAddHandler(client, instrumentOpts)).ServeHTTP - r.HandleFunc(DeprecatedM3DBAddURL, addHandler).Methods(AddHTTPMethod) - r.HandleFunc(M3DBAddURL, addHandler).Methods(AddHTTPMethod) + // Add M3DB namespaces. + addHandler := wrapped( + applyMiddleware(NewAddHandler(client, instrumentOpts).ServeHTTP, defaults)) + r.HandleFunc(DeprecatedM3DBAddURL, addHandler.ServeHTTP).Methods(AddHTTPMethod) + r.HandleFunc(M3DBAddURL, addHandler.ServeHTTP).Methods(AddHTTPMethod) // Delete M3DB namespaces. - deleteHandler := wrapped(NewDeleteHandler(client, instrumentOpts)).ServeHTTP - r.HandleFunc(DeprecatedM3DBDeleteURL, deleteHandler).Methods(DeleteHTTPMethod) - r.HandleFunc(M3DBDeleteURL, deleteHandler).Methods(DeleteHTTPMethod) + deleteHandler := wrapped(NewDeleteHandler(client, instrumentOpts)) + r.HandleFunc(DeprecatedM3DBDeleteURL, deleteHandler.ServeHTTP).Methods(DeleteHTTPMethod) + r.HandleFunc(M3DBDeleteURL, deleteHandler.ServeHTTP).Methods(DeleteHTTPMethod) // Deploy M3DB schemas. - schemaHandler := wrapped(NewSchemaHandler(client, instrumentOpts)).ServeHTTP - r.HandleFunc(M3DBSchemaURL, schemaHandler).Methods(SchemaDeployHTTPMethod) + schemaHandler := wrapped( + applyMiddleware(NewSchemaHandler(client, instrumentOpts).ServeHTTP, defaults)) + r.HandleFunc(M3DBSchemaURL, schemaHandler.ServeHTTP).Methods(SchemaDeployHTTPMethod) // Reset M3DB schemas. - schemaResetHandler := wrapped(NewSchemaResetHandler(client, instrumentOpts)).ServeHTTP - r.HandleFunc(M3DBSchemaURL, schemaResetHandler).Methods(DeleteHTTPMethod) + schemaResetHandler := wrapped( + applyMiddleware(NewSchemaResetHandler(client, instrumentOpts).ServeHTTP, defaults)) + r.HandleFunc(M3DBSchemaURL, schemaResetHandler.ServeHTTP).Methods(DeleteHTTPMethod) } diff --git a/src/query/api/v1/handler/namespace/schema.go b/src/query/api/v1/handler/namespace/schema.go index 9449ecd938..1c49a9c604 100644 --- a/src/query/api/v1/handler/namespace/schema.go +++ b/src/query/api/v1/handler/namespace/schema.go @@ -64,7 +64,11 @@ func NewSchemaHandler( } } -func (h *SchemaHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { +func (h *SchemaHandler) ServeHTTP( + svc handler.ServiceNameAndDefaults, + w http.ResponseWriter, + r *http.Request, +) { ctx := r.Context() logger := logging.WithContext(ctx, h.instrumentOpts) @@ -75,7 +79,7 @@ func (h *SchemaHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - opts := handler.NewServiceOptions("kv", r.Header, nil) + opts := handler.NewServiceOptions(svc, r.Header, nil) resp, err := h.Add(md, opts) if err != nil { if err == kv.ErrNotFound || xerrors.InnerError(err) == kv.ErrNotFound { @@ -108,7 +112,10 @@ func (h *SchemaHandler) parseRequest(r *http.Request) (*admin.NamespaceSchemaAdd } // Add adds schema to an existing namespace. -func (h *SchemaHandler) Add(addReq *admin.NamespaceSchemaAddRequest, opts handler.ServiceOptions) (admin.NamespaceSchemaAddResponse, error) { +func (h *SchemaHandler) Add( + addReq *admin.NamespaceSchemaAddRequest, + opts handler.ServiceOptions, +) (admin.NamespaceSchemaAddResponse, error) { var emptyRep = admin.NamespaceSchemaAddResponse{} kvOpts := kv.NewOverrideOptions(). @@ -142,7 +149,11 @@ func NewSchemaResetHandler( } } -func (h *SchemaResetHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { +func (h *SchemaResetHandler) ServeHTTP( + svc handler.ServiceNameAndDefaults, + w http.ResponseWriter, + r *http.Request, +) { ctx := r.Context() logger := logging.WithContext(ctx, h.instrumentOpts) @@ -153,7 +164,7 @@ func (h *SchemaResetHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - opts := handler.NewServiceOptions("kv", r.Header, nil) + opts := handler.NewServiceOptions(svc, r.Header, nil) resp, err := h.Reset(md, opts) if err != nil { if err == kv.ErrNotFound || xerrors.InnerError(err) == kv.ErrNotFound { @@ -186,7 +197,10 @@ func (h *SchemaResetHandler) parseRequest(r *http.Request) (*admin.NamespaceSche } // Reset resets schema for an existing namespace. -func (h *SchemaResetHandler) Reset(addReq *admin.NamespaceSchemaResetRequest, opts handler.ServiceOptions) (*admin.NamespaceSchemaResetResponse, error) { +func (h *SchemaResetHandler) Reset( + addReq *admin.NamespaceSchemaResetRequest, + opts handler.ServiceOptions, +) (*admin.NamespaceSchemaResetResponse, error) { var emptyRep = admin.NamespaceSchemaResetResponse{} if !opts.Force { return &emptyRep, fmt.Errorf("CAUTION! Reset schema will prevent proto-enabled namespace from loading, proceed if you know what you are doing, please retry with force set to true") diff --git a/src/query/api/v1/handler/namespace/schema_test.go b/src/query/api/v1/handler/namespace/schema_test.go index 7fbb447edb..876898c126 100644 --- a/src/query/api/v1/handler/namespace/schema_test.go +++ b/src/query/api/v1/handler/namespace/schema_test.go @@ -34,6 +34,7 @@ import ( nsproto "github.com/m3db/m3/src/dbnode/generated/proto/namespace" "github.com/m3db/m3/src/dbnode/namespace" "github.com/m3db/m3/src/dbnode/namespace/kvadmin" + apihandler "github.com/m3db/m3/src/query/api/v1/handler" "github.com/m3db/m3/src/x/instrument" "github.com/golang/mock/gomock" @@ -94,6 +95,12 @@ message ImportedMessage { ` ) +var ( + svcDefaults = apihandler.ServiceNameAndDefaults{ + ServiceName: "m3db", + } +) + func genTestJSON(t *testing.T) string { tempDir, err := ioutil.TempDir("", "schema_deploy_test") require.NoError(t, err) @@ -144,7 +151,7 @@ func TestSchemaDeploy_KVKeyNotFound(t *testing.T) { require.NotNil(t, req) mockKV.EXPECT().Get(M3DBNodeNamespacesKey).Return(nil, kv.ErrNotFound) - addHandler.ServeHTTP(w, req) + addHandler.ServeHTTP(svcDefaults, w, req) resp := w.Result() body, err := ioutil.ReadAll(resp.Body) @@ -186,7 +193,7 @@ func TestSchemaDeploy(t *testing.T) { req := httptest.NewRequest("POST", "/schema", strings.NewReader(testSchemaJSON)) require.NotNil(t, req) - schemaHandler.ServeHTTP(w, req) + schemaHandler.ServeHTTP(svcDefaults, w, req) resp := w.Result() body, _ := ioutil.ReadAll(resp.Body) @@ -239,7 +246,7 @@ func TestSchemaDeploy_NamespaceNotFound(t *testing.T) { mockKV.EXPECT().Get(M3DBNodeNamespacesKey).Return(mockValue, nil) w := httptest.NewRecorder() - schemaHandler.ServeHTTP(w, req) + schemaHandler.ServeHTTP(svcDefaults, w, req) resp := w.Result() body, _ := ioutil.ReadAll(resp.Body) assert.Equal(t, http.StatusNotFound, resp.StatusCode) @@ -270,7 +277,7 @@ func TestSchemaReset(t *testing.T) { req := httptest.NewRequest("DELETE", "/schema", strings.NewReader(jsonInput)) require.NotNil(t, req) - schemaHandler.ServeHTTP(w, req) + schemaHandler.ServeHTTP(svcDefaults, w, req) resp := w.Result() body, _ := ioutil.ReadAll(resp.Body) @@ -281,7 +288,7 @@ func TestSchemaReset(t *testing.T) { require.NotNil(t, req) req.Header.Add("Force", "true") - schemaHandler.ServeHTTP(w, req) + schemaHandler.ServeHTTP(svcDefaults, w, req) resp = w.Result() body, _ = ioutil.ReadAll(resp.Body) diff --git a/src/query/api/v1/handler/options.go b/src/query/api/v1/handler/options.go index 671e663e08..8a21914b42 100644 --- a/src/query/api/v1/handler/options.go +++ b/src/query/api/v1/handler/options.go @@ -60,7 +60,7 @@ type ServiceOptions struct { M3Agg *M3AggServiceOptions DryRun bool - Force bool + Force bool } // M3AggServiceOptions contains the service options that are @@ -70,23 +70,55 @@ type M3AggServiceOptions struct { WarmupDuration time.Duration } +// ServiceOptionsDefault is a default to apply to service options. +type ServiceOptionsDefault func(o ServiceOptions) ServiceOptions + +// WithDefaultServiceEnvironment returns the default service environment. +func WithDefaultServiceEnvironment(env string) ServiceOptionsDefault { + return func(o ServiceOptions) ServiceOptions { + o.ServiceEnvironment = env + return o + } +} + +// WithDefaultServiceZone returns the default service zone. +func WithDefaultServiceZone(zone string) ServiceOptionsDefault { + return func(o ServiceOptions) ServiceOptions { + o.ServiceZone = zone + return o + } +} + +// ServiceNameAndDefaults is the params used when identifying a service +// and it's service option defaults. +type ServiceNameAndDefaults struct { + ServiceName string + Defaults []ServiceOptionsDefault +} + // NewServiceOptions returns a ServiceOptions based on the provided // values. func NewServiceOptions( - serviceName string, headers http.Header, m3AggOpts *M3AggServiceOptions) ServiceOptions { + service ServiceNameAndDefaults, + headers http.Header, + m3AggOpts *M3AggServiceOptions, +) ServiceOptions { opts := ServiceOptions{ - ServiceName: serviceName, + ServiceName: service.ServiceName, ServiceEnvironment: DefaultServiceEnvironment, ServiceZone: DefaultServiceZone, DryRun: false, - Force: false, + Force: false, M3Agg: &M3AggServiceOptions{ MaxAggregationWindowSize: defaultM3AggMaxAggregationWindowSize, WarmupDuration: defaultM3AggWarmupDuration, }, } + for _, applyDefault := range service.Defaults { + opts = applyDefault(opts) + } if v := strings.TrimSpace(headers.Get(HeaderClusterEnvironmentName)); v != "" { opts.ServiceEnvironment = v diff --git a/src/query/api/v1/handler/options_test.go b/src/query/api/v1/handler/options_test.go index 3507f36310..fa787c3d7e 100644 --- a/src/query/api/v1/handler/options_test.go +++ b/src/query/api/v1/handler/options_test.go @@ -75,7 +75,10 @@ func TestNewServiceOptions(t *testing.T) { for k, v := range test.headers { h.Add(k, v) } - opts := NewServiceOptions(test.service, h, test.aggOpts) + svcDefaults := ServiceNameAndDefaults{ + ServiceName: test.service, + } + opts := NewServiceOptions(svcDefaults, h, test.aggOpts) assert.Equal(t, test.exp, opts) } } diff --git a/src/query/api/v1/handler/placement/add.go b/src/query/api/v1/handler/placement/add.go index a5176656cc..82f12a697b 100644 --- a/src/query/api/v1/handler/placement/add.go +++ b/src/query/api/v1/handler/placement/add.go @@ -66,7 +66,11 @@ func NewAddHandler(opts HandlerOptions) *AddHandler { return &AddHandler{HandlerOptions: opts, nowFn: time.Now} } -func (h *AddHandler) ServeHTTP(serviceName string, w http.ResponseWriter, r *http.Request) { +func (h *AddHandler) ServeHTTP( + svc handler.ServiceNameAndDefaults, + w http.ResponseWriter, + r *http.Request, +) { ctx := r.Context() logger := logging.WithContext(ctx, h.instrumentOptions) @@ -76,7 +80,7 @@ func (h *AddHandler) ServeHTTP(serviceName string, w http.ResponseWriter, r *htt return } - placement, err := h.Add(serviceName, r, req) + placement, err := h.Add(svc, r, req) if err != nil { status := http.StatusInternalServerError if _, ok := err.(unsafeAddError); ok { @@ -114,7 +118,7 @@ func (h *AddHandler) parseRequest(r *http.Request) (*admin.PlacementAddRequest, // Add adds a placement. func (h *AddHandler) Add( - serviceName string, + svc handler.ServiceNameAndDefaults, httpReq *http.Request, req *admin.PlacementAddRequest, ) (placement.Placement, error) { @@ -123,8 +127,8 @@ func (h *AddHandler) Add( return nil, err } - serviceOpts := handler.NewServiceOptions( - serviceName, httpReq.Header, h.m3AggServiceOptions) + serviceOpts := handler.NewServiceOptions(svc, httpReq.Header, + h.m3AggServiceOptions) var validateFn placement.ValidateFn if !req.Force { validateFn = validateAllAvailable diff --git a/src/query/api/v1/handler/placement/add_test.go b/src/query/api/v1/handler/placement/add_test.go index 5878342e44..42d10ec2e9 100644 --- a/src/query/api/v1/handler/placement/add_test.go +++ b/src/query/api/v1/handler/placement/add_test.go @@ -65,8 +65,11 @@ func TestPlacementAddHandler_Force(t *testing.T) { } require.NotNil(t, req) + svcDefaults := apihandler.ServiceNameAndDefaults{ + ServiceName: serviceName, + } mockPlacementService.EXPECT().AddInstances(gomock.Any()).Return(placement.NewPlacement(), nil, errors.New("no new instances found in the valid zone")) - handler.ServeHTTP(serviceName, w, req) + handler.ServeHTTP(svcDefaults, w, req) resp := w.Result() body, _ := ioutil.ReadAll(resp.Body) @@ -83,7 +86,7 @@ func TestPlacementAddHandler_Force(t *testing.T) { require.NotNil(t, req) mockPlacementService.EXPECT().AddInstances(gomock.Not(nil)).Return(placement.NewPlacement(), nil, nil) - handler.ServeHTTP(serviceName, w, req) + handler.ServeHTTP(svcDefaults, w, req) resp = w.Result() body, _ = ioutil.ReadAll(resp.Body) @@ -116,7 +119,11 @@ func TestPlacementAddHandler_SafeErr_NoNewInstance(t *testing.T) { } require.NotNil(t, req) - handler.ServeHTTP(serviceName, w, req) + svcDefaults := apihandler.ServiceNameAndDefaults{ + ServiceName: serviceName, + } + + handler.ServeHTTP(svcDefaults, w, req) resp := w.Result() body, _ := ioutil.ReadAll(resp.Body) @@ -148,7 +155,11 @@ func TestPlacementAddHandler_SafeErr_NotAllAvailable(t *testing.T) { } require.NotNil(t, req) - handler.ServeHTTP(serviceName, w, req) + svcDefaults := apihandler.ServiceNameAndDefaults{ + ServiceName: serviceName, + } + + handler.ServeHTTP(svcDefaults, w, req) resp := w.Result() body, _ := ioutil.ReadAll(resp.Body) @@ -207,7 +218,11 @@ func TestPlacementAddHandler_SafeOK(t *testing.T) { } mockPlacementService.EXPECT().AddInstances(gomock.Any()).Return(nil, nil, errors.New("test err")) - handler.ServeHTTP(serviceName, w, req) + svcDefaults := apihandler.ServiceNameAndDefaults{ + ServiceName: serviceName, + } + + handler.ServeHTTP(svcDefaults, w, req) resp := w.Result() body, _ := ioutil.ReadAll(resp.Body) @@ -240,7 +255,7 @@ func TestPlacementAddHandler_SafeOK(t *testing.T) { } else { mockPlacementService.EXPECT().AddInstances(gomock.Any()).Return(existingPlacement.Clone().SetVersion(1), nil, nil) } - handler.ServeHTTP(serviceName, w, req) + handler.ServeHTTP(svcDefaults, w, req) resp = w.Result() body, _ = ioutil.ReadAll(resp.Body) diff --git a/src/query/api/v1/handler/placement/common.go b/src/query/api/v1/handler/placement/common.go index c105ae6704..558e3e9118 100644 --- a/src/query/api/v1/handler/placement/common.go +++ b/src/query/api/v1/handler/placement/common.go @@ -228,13 +228,14 @@ func ConvertInstancesProto(instancesProto []*placementpb.Instance) ([]placement. // RegisterRoutes registers the placement routes func RegisterRoutes( r *mux.Router, + defaults []handler.ServiceOptionsDefault, opts HandlerOptions, ) { // Init var ( initHandler = NewInitHandler(opts) - deprecatedInitFn = applyDeprecatedMiddleware(initHandler.ServeHTTP, opts.instrumentOptions) - initFn = applyMiddleware(initHandler.ServeHTTP, opts.instrumentOptions) + deprecatedInitFn = applyDeprecatedMiddleware(initHandler.ServeHTTP, defaults, opts.instrumentOptions) + initFn = applyMiddleware(initHandler.ServeHTTP, defaults, opts.instrumentOptions) ) r.HandleFunc(DeprecatedM3DBInitURL, deprecatedInitFn).Methods(InitHTTPMethod) r.HandleFunc(M3DBInitURL, initFn).Methods(InitHTTPMethod) @@ -244,8 +245,8 @@ func RegisterRoutes( // Get var ( getHandler = NewGetHandler(opts) - deprecatedGetFn = applyDeprecatedMiddleware(getHandler.ServeHTTP, opts.instrumentOptions) - getFn = applyMiddleware(getHandler.ServeHTTP, opts.instrumentOptions) + deprecatedGetFn = applyDeprecatedMiddleware(getHandler.ServeHTTP, defaults, opts.instrumentOptions) + getFn = applyMiddleware(getHandler.ServeHTTP, defaults, opts.instrumentOptions) ) r.HandleFunc(DeprecatedM3DBGetURL, deprecatedGetFn).Methods(GetHTTPMethod) r.HandleFunc(M3DBGetURL, getFn).Methods(GetHTTPMethod) @@ -255,8 +256,8 @@ func RegisterRoutes( // Delete all var ( deleteAllHandler = NewDeleteAllHandler(opts) - deprecatedDeleteAllFn = applyDeprecatedMiddleware(deleteAllHandler.ServeHTTP, opts.instrumentOptions) - deleteAllFn = applyMiddleware(deleteAllHandler.ServeHTTP, opts.instrumentOptions) + deprecatedDeleteAllFn = applyDeprecatedMiddleware(deleteAllHandler.ServeHTTP, defaults, opts.instrumentOptions) + deleteAllFn = applyMiddleware(deleteAllHandler.ServeHTTP, defaults, opts.instrumentOptions) ) r.HandleFunc(DeprecatedM3DBDeleteAllURL, deprecatedDeleteAllFn).Methods(DeleteAllHTTPMethod) r.HandleFunc(M3DBDeleteAllURL, deleteAllFn).Methods(DeleteAllHTTPMethod) @@ -266,8 +267,8 @@ func RegisterRoutes( // Add var ( addHandler = NewAddHandler(opts) - deprecatedAddFn = applyDeprecatedMiddleware(addHandler.ServeHTTP, opts.instrumentOptions) - addFn = applyMiddleware(addHandler.ServeHTTP, opts.instrumentOptions) + deprecatedAddFn = applyDeprecatedMiddleware(addHandler.ServeHTTP, defaults, opts.instrumentOptions) + addFn = applyMiddleware(addHandler.ServeHTTP, defaults, opts.instrumentOptions) ) r.HandleFunc(DeprecatedM3DBAddURL, deprecatedAddFn).Methods(AddHTTPMethod) r.HandleFunc(M3DBAddURL, addFn).Methods(AddHTTPMethod) @@ -277,8 +278,8 @@ func RegisterRoutes( // Delete var ( deleteHandler = NewDeleteHandler(opts) - deprecatedDeleteFn = applyDeprecatedMiddleware(deleteHandler.ServeHTTP, opts.instrumentOptions) - deleteFn = applyMiddleware(deleteHandler.ServeHTTP, opts.instrumentOptions) + deprecatedDeleteFn = applyDeprecatedMiddleware(deleteHandler.ServeHTTP, defaults, opts.instrumentOptions) + deleteFn = applyMiddleware(deleteHandler.ServeHTTP, defaults, opts.instrumentOptions) ) r.HandleFunc(DeprecatedM3DBDeleteURL, deprecatedDeleteFn).Methods(DeleteHTTPMethod) r.HandleFunc(M3DBDeleteURL, deleteFn).Methods(DeleteHTTPMethod) @@ -288,7 +289,7 @@ func RegisterRoutes( // Replace var ( replaceHandler = NewReplaceHandler(opts) - replaceFn = applyMiddleware(replaceHandler.ServeHTTP, opts.instrumentOptions) + replaceFn = applyMiddleware(replaceHandler.ServeHTTP, defaults, opts.instrumentOptions) ) r.HandleFunc(M3DBReplaceURL, replaceFn).Methods(ReplaceHTTPMethod) r.HandleFunc(M3AggReplaceURL, replaceFn).Methods(ReplaceHTTPMethod) @@ -396,38 +397,49 @@ func validateAllAvailable(p placement.Placement) error { } func applyMiddleware( - f func(serviceName string, w http.ResponseWriter, r *http.Request), + f func(svc handler.ServiceNameAndDefaults, w http.ResponseWriter, r *http.Request), + defaults []handler.ServiceOptionsDefault, instrumentOpts instrument.Options, ) func(w http.ResponseWriter, r *http.Request) { return logging.WithResponseTimeAndPanicErrorLoggingFunc( - parseServiceMiddleware(f), + parseServiceMiddleware(f, defaults), instrumentOpts, ).ServeHTTP } func applyDeprecatedMiddleware( - f func(serviceName string, w http.ResponseWriter, r *http.Request), + f func(svc handler.ServiceNameAndDefaults, w http.ResponseWriter, r *http.Request), + defaults []handler.ServiceOptionsDefault, instrumentOpts instrument.Options, ) func(w http.ResponseWriter, r *http.Request) { return logging.WithResponseTimeAndPanicErrorLoggingFunc( func(w http.ResponseWriter, r *http.Request) { - f(handler.M3DBServiceName, w, r) + svc := handler.ServiceNameAndDefaults{ + ServiceName: handler.M3DBServiceName, + Defaults: defaults, + } + f(svc, w, r) }, instrumentOpts, ).ServeHTTP } func parseServiceMiddleware( - next func(serviceName string, w http.ResponseWriter, r *http.Request), + next func(svc handler.ServiceNameAndDefaults, w http.ResponseWriter, r *http.Request), + defaults []handler.ServiceOptionsDefault, ) func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) { - serviceName, err := parseServiceFromRequest(r) + var ( + svc = handler.ServiceNameAndDefaults{Defaults: defaults} + err error + ) + svc.ServiceName, err = parseServiceFromRequest(r) if err != nil { xhttp.Error(w, err, http.StatusBadRequest) return } - next(serviceName, w, r) + next(svc, w, r) } } diff --git a/src/query/api/v1/handler/placement/common_test.go b/src/query/api/v1/handler/placement/common_test.go index 317574319d..457e129fda 100644 --- a/src/query/api/v1/handler/placement/common_test.go +++ b/src/query/api/v1/handler/placement/common_test.go @@ -31,6 +31,7 @@ import ( "github.com/m3db/m3/src/cluster/services" "github.com/m3db/m3/src/cluster/shard" "github.com/m3db/m3/src/query/api/v1/handler" + apihandler "github.com/m3db/m3/src/query/api/v1/handler" "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" @@ -52,8 +53,12 @@ func TestPlacementService(t *testing.T) { mockClient.EXPECT().Services(gomock.Not(nil)).Return(mockServices, nil) mockServices.EXPECT().PlacementService(gomock.Not(nil), gomock.Not(nil)).Return(mockPlacementService, nil) + svcDefaults := apihandler.ServiceNameAndDefaults{ + ServiceName: serviceName, + } + placementService, algo, err := ServiceWithAlgo( - mockClient, handler.NewServiceOptions(handler.M3DBServiceName, nil, nil), time.Time{}, nil) + mockClient, handler.NewServiceOptions(svcDefaults, nil, nil), time.Time{}, nil) assert.NoError(t, err) assert.NotNil(t, placementService) assert.NotNil(t, algo) @@ -61,7 +66,7 @@ func TestPlacementService(t *testing.T) { // Test Services returns error mockClient.EXPECT().Services(gomock.Not(nil)).Return(nil, errors.New("dummy service error")) placementService, err = Service( - mockClient, handler.NewServiceOptions(handler.M3DBServiceName, nil, nil), time.Time{}, nil) + mockClient, handler.NewServiceOptions(svcDefaults, nil, nil), time.Time{}, nil) assert.Nil(t, placementService) assert.EqualError(t, err, "dummy service error") @@ -69,7 +74,7 @@ func TestPlacementService(t *testing.T) { mockClient.EXPECT().Services(gomock.Not(nil)).Return(mockServices, nil) mockServices.EXPECT().PlacementService(gomock.Not(nil), gomock.Not(nil)).Return(nil, errors.New("dummy placement error")) placementService, err = Service( - mockClient, handler.NewServiceOptions(handler.M3DBServiceName, nil, nil), time.Time{}, nil) + mockClient, handler.NewServiceOptions(svcDefaults, nil, nil), time.Time{}, nil) assert.Nil(t, placementService) assert.EqualError(t, err, "dummy placement error") }) @@ -100,10 +105,13 @@ func TestPlacementServiceWithClusterHeaders(t *testing.T) { }) var ( - serviceValue = handler.M3DBServiceName + serviceValue = handler.M3DBServiceName + svcDefaults = apihandler.ServiceNameAndDefaults{ + ServiceName: handler.M3DBServiceName, + } environmentValue = "bar_env" zoneValue = "baz_zone" - opts = handler.NewServiceOptions(serviceValue, nil, nil) + opts = handler.NewServiceOptions(svcDefaults, nil, nil) ) opts.ServiceEnvironment = environmentValue opts.ServiceZone = zoneValue diff --git a/src/query/api/v1/handler/placement/delete.go b/src/query/api/v1/handler/placement/delete.go index d0592b563c..2737fbe5ed 100644 --- a/src/query/api/v1/handler/placement/delete.go +++ b/src/query/api/v1/handler/placement/delete.go @@ -72,7 +72,11 @@ func NewDeleteHandler(opts HandlerOptions) *DeleteHandler { return &DeleteHandler{HandlerOptions: opts, nowFn: time.Now} } -func (h *DeleteHandler) ServeHTTP(serviceName string, w http.ResponseWriter, r *http.Request) { +func (h *DeleteHandler) ServeHTTP( + svc handler.ServiceNameAndDefaults, + w http.ResponseWriter, + r *http.Request, +) { var ( ctx = r.Context() logger = logging.WithContext(ctx, h.instrumentOptions) @@ -87,8 +91,7 @@ func (h *DeleteHandler) ServeHTTP(serviceName string, w http.ResponseWriter, r * var ( force = r.FormValue(placementForceVar) == "true" - opts = handler.NewServiceOptions( - serviceName, r.Header, h.m3AggServiceOptions) + opts = handler.NewServiceOptions(svc, r.Header, h.m3AggServiceOptions) ) service, algo, err := ServiceWithAlgo(h.clusterClient, opts, h.nowFn(), nil) @@ -100,7 +103,7 @@ func (h *DeleteHandler) ServeHTTP(serviceName string, w http.ResponseWriter, r * toRemove := []string{id} // There are no unsafe placement changes because M3Coordinator is stateless - if isStateless(serviceName) { + if isStateless(svc.ServiceName) { force = true } diff --git a/src/query/api/v1/handler/placement/delete_all.go b/src/query/api/v1/handler/placement/delete_all.go index 1dc77fc7a0..9b85ca0ecc 100644 --- a/src/query/api/v1/handler/placement/delete_all.go +++ b/src/query/api/v1/handler/placement/delete_all.go @@ -65,12 +65,15 @@ func NewDeleteAllHandler(opts HandlerOptions) *DeleteAllHandler { return &DeleteAllHandler{HandlerOptions: opts, nowFn: time.Now} } -func (h *DeleteAllHandler) ServeHTTP(serviceName string, w http.ResponseWriter, r *http.Request) { +func (h *DeleteAllHandler) ServeHTTP( + svc handler.ServiceNameAndDefaults, + w http.ResponseWriter, + r *http.Request, +) { var ( ctx = r.Context() logger = logging.WithContext(ctx, h.instrumentOptions) - opts = handler.NewServiceOptions( - serviceName, r.Header, h.m3AggServiceOptions) + opts = handler.NewServiceOptions(svc, r.Header, h.m3AggServiceOptions) ) service, err := Service(h.clusterClient, opts, h.nowFn(), nil) @@ -81,11 +84,13 @@ func (h *DeleteAllHandler) ServeHTTP(serviceName string, w http.ResponseWriter, if err := service.Delete(); err != nil { if err == kv.ErrNotFound { - logger.Info("cannot delete absent placement", zap.String("service", serviceName)) + logger.Info("cannot delete placement", + zap.String("service", svc.ServiceName), + zap.Error(err)) xhttp.Error(w, err, http.StatusNotFound) return } - logger.Error("unable to delete placement", zap.Error(err)) + logger.Error("error deleting placement", zap.Error(err)) xhttp.Error(w, err, http.StatusInternalServerError) return } diff --git a/src/query/api/v1/handler/placement/delete_all_test.go b/src/query/api/v1/handler/placement/delete_all_test.go index eaf30a2640..85333d597a 100644 --- a/src/query/api/v1/handler/placement/delete_all_test.go +++ b/src/query/api/v1/handler/placement/delete_all_test.go @@ -28,6 +28,7 @@ import ( "github.com/m3db/m3/src/cluster/kv" "github.com/m3db/m3/src/cmd/services/m3query/config" + apihandler "github.com/m3db/m3/src/query/api/v1/handler" "github.com/m3db/m3/src/x/instrument" "github.com/golang/mock/gomock" @@ -46,12 +47,16 @@ func TestPlacementDeleteAllHandler(t *testing.T) { require.NoError(t, err) handler := NewDeleteAllHandler(handlerOpts) + svcDefaults := apihandler.ServiceNameAndDefaults{ + ServiceName: serviceName, + } + // Test delete success w := httptest.NewRecorder() req := httptest.NewRequest(DeleteAllHTTPMethod, M3DBDeleteAllURL, nil) require.NotNil(t, req) mockPlacementService.EXPECT().Delete() - handler.ServeHTTP(serviceName, w, req) + handler.ServeHTTP(svcDefaults, w, req) resp := w.Result() assert.Equal(t, http.StatusOK, resp.StatusCode) @@ -61,7 +66,7 @@ func TestPlacementDeleteAllHandler(t *testing.T) { req = httptest.NewRequest(DeleteAllHTTPMethod, M3DBDeleteAllURL, nil) require.NotNil(t, req) mockPlacementService.EXPECT().Delete().Return(errors.New("error")) - handler.ServeHTTP(serviceName, w, req) + handler.ServeHTTP(svcDefaults, w, req) resp = w.Result() assert.Equal(t, http.StatusInternalServerError, resp.StatusCode) @@ -71,7 +76,7 @@ func TestPlacementDeleteAllHandler(t *testing.T) { req = httptest.NewRequest(DeleteAllHTTPMethod, M3DBDeleteAllURL, nil) require.NotNil(t, req) mockPlacementService.EXPECT().Delete().Return(kv.ErrNotFound) - handler.ServeHTTP(serviceName, w, req) + handler.ServeHTTP(svcDefaults, w, req) resp = w.Result() assert.Equal(t, http.StatusNotFound, resp.StatusCode) diff --git a/src/query/api/v1/handler/placement/delete_test.go b/src/query/api/v1/handler/placement/delete_test.go index fa6c0eea71..80dffae277 100644 --- a/src/query/api/v1/handler/placement/delete_test.go +++ b/src/query/api/v1/handler/placement/delete_test.go @@ -50,13 +50,17 @@ func TestPlacementDeleteHandler_Force(t *testing.T) { config.Configuration{}, nil, instrument.NewOptions()) handler := NewDeleteHandler(handlerOpts) + svcDefaults := apihandler.ServiceNameAndDefaults{ + ServiceName: serviceName, + } + // Test remove success w := httptest.NewRecorder() req := httptest.NewRequest(DeleteHTTPMethod, "/placement/host1?force=true", nil) req = mux.SetURLVars(req, map[string]string{"id": "host1"}) require.NotNil(t, req) mockPlacementService.EXPECT().RemoveInstances([]string{"host1"}).Return(placement.NewPlacement(), nil) - handler.ServeHTTP(serviceName, w, req) + handler.ServeHTTP(svcDefaults, w, req) resp := w.Result() body, err := ioutil.ReadAll(resp.Body) @@ -70,7 +74,7 @@ func TestPlacementDeleteHandler_Force(t *testing.T) { req = mux.SetURLVars(req, map[string]string{"id": "nope"}) require.NotNil(t, req) mockPlacementService.EXPECT().RemoveInstances([]string{"nope"}).Return(placement.NewPlacement(), errors.New("ID does not exist")) - handler.ServeHTTP(serviceName, w, req) + handler.ServeHTTP(svcDefaults, w, req) resp = w.Result() body, err = ioutil.ReadAll(resp.Body) @@ -128,12 +132,16 @@ func testDeleteHandlerSafe(t *testing.T, serviceName string) { SetIsMirrored(true) } + svcDefaults := apihandler.ServiceNameAndDefaults{ + ServiceName: serviceName, + } + req = mux.SetURLVars(req, map[string]string{"id": "host1"}) require.NotNil(t, req) if !isStateless(serviceName) { mockPlacementService.EXPECT().Placement().Return(basePlacement, nil) } - handler.ServeHTTP(serviceName, w, req) + handler.ServeHTTP(svcDefaults, w, req) resp := w.Result() body, err := ioutil.ReadAll(resp.Body) @@ -165,7 +173,7 @@ func testDeleteHandlerSafe(t *testing.T, serviceName string) { req = mux.SetURLVars(req, map[string]string{"id": "host1"}) require.NotNil(t, req) mockPlacementService.EXPECT().Placement().Return(basePlacement, nil) - handler.ServeHTTP(serviceName, w, req) + handler.ServeHTTP(svcDefaults, w, req) resp = w.Result() body, err = ioutil.ReadAll(resp.Body) @@ -259,7 +267,7 @@ func testDeleteHandlerSafe(t *testing.T, serviceName string) { mockPlacementService.EXPECT().CheckAndSet(gomock.Any(), 0).Return(returnPlacement, nil) } - handler.ServeHTTP(serviceName, w, req) + handler.ServeHTTP(svcDefaults, w, req) resp = w.Result() body, err = ioutil.ReadAll(resp.Body) diff --git a/src/query/api/v1/handler/placement/get.go b/src/query/api/v1/handler/placement/get.go index e76049bc06..1a5c40c531 100644 --- a/src/query/api/v1/handler/placement/get.go +++ b/src/query/api/v1/handler/placement/get.go @@ -70,13 +70,17 @@ func NewGetHandler(opts HandlerOptions) *GetHandler { return &GetHandler{HandlerOptions: opts, nowFn: time.Now} } -func (h *GetHandler) ServeHTTP(serviceName string, w http.ResponseWriter, r *http.Request) { +func (h *GetHandler) ServeHTTP( + service handler.ServiceNameAndDefaults, + w http.ResponseWriter, + r *http.Request, +) { var ( ctx = r.Context() logger = logging.WithContext(ctx, h.instrumentOptions) ) - placement, badRequest, err := h.Get(serviceName, r) + placement, badRequest, err := h.Get(service, r) if err != nil && badRequest { xhttp.Error(w, err, http.StatusBadRequest) return @@ -107,7 +111,7 @@ func (h *GetHandler) ServeHTTP(serviceName string, w http.ResponseWriter, r *htt // Get gets a placement. func (h *GetHandler) Get( - serviceName string, + svc handler.ServiceNameAndDefaults, httpReq *http.Request, ) (placement placement.Placement, badRequest bool, err error) { var headers http.Header @@ -115,9 +119,7 @@ func (h *GetHandler) Get( headers = httpReq.Header } - opts := handler.NewServiceOptions( - serviceName, headers, h.m3AggServiceOptions) - + opts := handler.NewServiceOptions(svc, headers, h.m3AggServiceOptions) service, err := Service(h.clusterClient, opts, h.nowFn(), nil) if err != nil { return nil, false, err diff --git a/src/query/api/v1/handler/placement/get_test.go b/src/query/api/v1/handler/placement/get_test.go index 64746732e1..11d37014b7 100644 --- a/src/query/api/v1/handler/placement/get_test.go +++ b/src/query/api/v1/handler/placement/get_test.go @@ -36,6 +36,7 @@ import ( "github.com/m3db/m3/src/cluster/placement/storage" "github.com/m3db/m3/src/cluster/services" "github.com/m3db/m3/src/cmd/services/m3query/config" + apihandler "github.com/m3db/m3/src/query/api/v1/handler" "github.com/m3db/m3/src/x/instrument" "github.com/golang/mock/gomock" @@ -125,8 +126,12 @@ func TestPlacementGetHandler(t *testing.T) { placementObj, err := placement.NewPlacementFromProto(placementProto) require.NoError(t, err) + svcDefaults := apihandler.ServiceNameAndDefaults{ + ServiceName: serviceName, + } + mockPlacementService.EXPECT().Placement().Return(placementObj, nil) - handler.ServeHTTP(serviceName, w, req) + handler.ServeHTTP(svcDefaults, w, req) resp := w.Result() body, _ := ioutil.ReadAll(resp.Body) @@ -139,7 +144,7 @@ func TestPlacementGetHandler(t *testing.T) { require.NotNil(t, req) mockPlacementService.EXPECT().Placement().Return(nil, errors.New("key not found")) - handler.ServeHTTP(serviceName, w, req) + handler.ServeHTTP(svcDefaults, w, req) resp = w.Result() assert.Equal(t, http.StatusNotFound, resp.StatusCode) @@ -149,7 +154,7 @@ func TestPlacementGetHandler(t *testing.T) { req = httptest.NewRequest(GetHTTPMethod, "/placement/get?version=foo", nil) require.NotNil(t, req) - handler.ServeHTTP(serviceName, w, req) + handler.ServeHTTP(svcDefaults, w, req) resp = w.Result() assert.Equal(t, http.StatusBadRequest, resp.StatusCode) @@ -160,7 +165,7 @@ func TestPlacementGetHandler(t *testing.T) { mockPlacementService.EXPECT().PlacementForVersion(12).Return(placementObj.Clone().SetVersion(12), nil) - handler.ServeHTTP(serviceName, w, req) + handler.ServeHTTP(svcDefaults, w, req) resp = w.Result() body, _ = ioutil.ReadAll(resp.Body) assert.Equal(t, http.StatusOK, resp.StatusCode) diff --git a/src/query/api/v1/handler/placement/init.go b/src/query/api/v1/handler/placement/init.go index 6f336611c8..8ee3535bf4 100644 --- a/src/query/api/v1/handler/placement/init.go +++ b/src/query/api/v1/handler/placement/init.go @@ -66,7 +66,7 @@ func NewInitHandler(opts HandlerOptions) *InitHandler { return &InitHandler{HandlerOptions: opts, nowFn: time.Now} } -func (h *InitHandler) ServeHTTP(serviceName string, w http.ResponseWriter, r *http.Request) { +func (h *InitHandler) ServeHTTP(svc handler.ServiceNameAndDefaults, w http.ResponseWriter, r *http.Request) { ctx := r.Context() logger := logging.WithContext(ctx, h.instrumentOptions) @@ -76,7 +76,7 @@ func (h *InitHandler) ServeHTTP(serviceName string, w http.ResponseWriter, r *ht return } - placement, err := h.Init(serviceName, r, req) + placement, err := h.Init(svc, r, req) if err != nil { if err == kv.ErrAlreadyExists { logger.Error("placement already exists", zap.Error(err)) @@ -114,7 +114,7 @@ func (h *InitHandler) parseRequest(r *http.Request) (*admin.PlacementInitRequest // Init initializes a placement. func (h *InitHandler) Init( - serviceName string, + svc handler.ServiceNameAndDefaults, httpReq *http.Request, req *admin.PlacementInitRequest, ) (placement.Placement, error) { @@ -123,16 +123,15 @@ func (h *InitHandler) Init( return nil, err } - serviceOpts := handler.NewServiceOptions( - serviceName, httpReq.Header, h.m3AggServiceOptions) - + serviceOpts := handler.NewServiceOptions(svc, httpReq.Header, + h.m3AggServiceOptions) service, err := Service(h.clusterClient, serviceOpts, h.nowFn(), nil) if err != nil { return nil, err } replicationFactor := int(req.ReplicationFactor) - switch serviceName { + switch svc.ServiceName { case handler.M3CoordinatorServiceName: // M3Coordinator placements are stateless replicationFactor = 1 diff --git a/src/query/api/v1/handler/placement/init_test.go b/src/query/api/v1/handler/placement/init_test.go index c5adc55365..a33c89fa91 100644 --- a/src/query/api/v1/handler/placement/init_test.go +++ b/src/query/api/v1/handler/placement/init_test.go @@ -92,8 +92,12 @@ func TestPlacementInitHandler(t *testing.T) { newPlacement, err := placement.NewPlacementFromProto(initTestPlacementProto) require.NoError(t, err) + svcDefaults := apihandler.ServiceNameAndDefaults{ + ServiceName: serviceName, + } + mockPlacementService.EXPECT().BuildInitialPlacement(gomock.Not(nil), 16, 1).Return(newPlacement, nil) - handler.ServeHTTP(serviceName, w, req) + handler.ServeHTTP(svcDefaults, w, req) resp := w.Result() body, err := ioutil.ReadAll(resp.Body) require.NoError(t, err) @@ -119,7 +123,7 @@ func TestPlacementInitHandler(t *testing.T) { BuildInitialPlacement(gomock.Not(nil), 64, 2). Return(nil, errors.New("unable to build initial placement")) } - handler.ServeHTTP(serviceName, w, req) + handler.ServeHTTP(svcDefaults, w, req) resp = w.Result() body, err = ioutil.ReadAll(resp.Body) require.NoError(t, err) @@ -146,7 +150,7 @@ func TestPlacementInitHandler(t *testing.T) { Return(nil, kv.ErrAlreadyExists) } - handler.ServeHTTP(serviceName, w, req) + handler.ServeHTTP(svcDefaults, w, req) resp = w.Result() _, err = ioutil.ReadAll(resp.Body) require.NoError(t, err) diff --git a/src/query/api/v1/handler/placement/replace.go b/src/query/api/v1/handler/placement/replace.go index 14c60d0686..2815f40cdd 100644 --- a/src/query/api/v1/handler/placement/replace.go +++ b/src/query/api/v1/handler/placement/replace.go @@ -63,7 +63,11 @@ func NewReplaceHandler(opts HandlerOptions) *ReplaceHandler { return &ReplaceHandler{HandlerOptions: opts, nowFn: time.Now} } -func (h *ReplaceHandler) ServeHTTP(serviceName string, w http.ResponseWriter, r *http.Request) { +func (h *ReplaceHandler) ServeHTTP( + svc handler.ServiceNameAndDefaults, + w http.ResponseWriter, + r *http.Request, +) { ctx := r.Context() logger := logging.WithContext(ctx, h.instrumentOptions) @@ -73,7 +77,7 @@ func (h *ReplaceHandler) ServeHTTP(serviceName string, w http.ResponseWriter, r return } - placement, err := h.Replace(serviceName, r, req) + placement, err := h.Replace(svc, r, req) if err != nil { status := http.StatusInternalServerError if _, ok := err.(unsafeAddError); ok { @@ -112,7 +116,7 @@ func (h *ReplaceHandler) parseRequest(r *http.Request) (*admin.PlacementReplaceR // Replace replaces instances. func (h *ReplaceHandler) Replace( - serviceName string, + svc handler.ServiceNameAndDefaults, httpReq *http.Request, req *admin.PlacementReplaceRequest, ) (placement.Placement, error) { @@ -121,7 +125,7 @@ func (h *ReplaceHandler) Replace( return nil, err } - serviceOpts := handler.NewServiceOptions(serviceName, httpReq.Header, h.m3AggServiceOptions) + serviceOpts := handler.NewServiceOptions(svc, httpReq.Header, h.m3AggServiceOptions) service, algo, err := ServiceWithAlgo(h.clusterClient, serviceOpts, h.nowFn(), nil) if err != nil { return nil, err @@ -138,7 +142,7 @@ func (h *ReplaceHandler) Replace( } // M3Coordinator isn't sharded, can't check if its shards are available. - if !isStateless(serviceName) { + if !isStateless(svc.ServiceName) { if err := validateAllAvailable(curPlacement); err != nil { return nil, err } diff --git a/src/query/api/v1/handler/placement/replace_test.go b/src/query/api/v1/handler/placement/replace_test.go index f43765ba44..fe80d74399 100644 --- a/src/query/api/v1/handler/placement/replace_test.go +++ b/src/query/api/v1/handler/placement/replace_test.go @@ -82,8 +82,12 @@ func testPlacementReplaceHandlerForce(t *testing.T, serviceName string) { w := httptest.NewRecorder() req := newReplaceRequest(`{"force": true, "leavingInstanceIDs": []}`) + svcDefaults := apihandler.ServiceNameAndDefaults{ + ServiceName: serviceName, + } + mockPlacementService.EXPECT().ReplaceInstances([]string{}, gomock.Any()).Return(placement.NewPlacement(), nil, errors.New("test")) - handler.ServeHTTP(serviceName, w, req) + handler.ServeHTTP(svcDefaults, w, req) resp := w.Result() body, _ := ioutil.ReadAll(resp.Body) @@ -93,7 +97,7 @@ func testPlacementReplaceHandlerForce(t *testing.T, serviceName string) { w = httptest.NewRecorder() req = newReplaceRequest(`{"force": true, "leavingInstanceIDs": ["a"]}`) mockPlacementService.EXPECT().ReplaceInstances([]string{"a"}, gomock.Not(nil)).Return(placement.NewPlacement(), nil, nil) - handler.ServeHTTP(serviceName, w, req) + handler.ServeHTTP(svcDefaults, w, req) resp = w.Result() body, _ = ioutil.ReadAll(resp.Body) assert.Equal(t, `{"placement":{"instances":{},"replicaFactor":0,"numShards":0,"isSharded":false,"cutoverTime":"0","isMirrored":false,"maxShardSetId":0},"version":0}`, string(body)) @@ -113,12 +117,16 @@ func testPlacementReplaceHandlerSafeErr(t *testing.T, serviceName string) { w := httptest.NewRecorder() req := newReplaceRequest("{}") + svcDefaults := apihandler.ServiceNameAndDefaults{ + ServiceName: serviceName, + } + mockPlacementService.EXPECT().Placement().Return(newInitPlacement(), nil) if serviceName == apihandler.M3CoordinatorServiceName { mockPlacementService.EXPECT().CheckAndSet(gomock.Any(), 0). Return(newInitPlacement().SetVersion(1), nil) } - handler.ServeHTTP(serviceName, w, req) + handler.ServeHTTP(svcDefaults, w, req) resp := w.Result() body, _ := ioutil.ReadAll(resp.Body) @@ -238,8 +246,12 @@ func testPlacementReplaceHandlerSafeOk(t *testing.T, serviceName string) { SetInstances(instances). SetVersion(2) + svcDefaults := apihandler.ServiceNameAndDefaults{ + ServiceName: serviceName, + } + mockPlacementService.EXPECT().CheckAndSet(matcher, 1).Return(returnPl, nil) - handler.ServeHTTP(serviceName, w, req) + handler.ServeHTTP(svcDefaults, w, req) resp := w.Result() body, _ := ioutil.ReadAll(resp.Body) diff --git a/src/query/api/v1/httpd/handler.go b/src/query/api/v1/httpd/handler.go index fe2275c643..119fe0f17e 100644 --- a/src/query/api/v1/httpd/handler.go +++ b/src/query/api/v1/httpd/handler.go @@ -92,6 +92,7 @@ type Handler struct { instrumentOpts instrument.Options cpuProfileDuration time.Duration placementServiceNames []string + serviceOptionDefaults []handler.ServiceOptionsDefault } // Router returns the http handler registered with all relevant routes for query. @@ -114,6 +115,7 @@ func NewHandler( instrumentOpts instrument.Options, cpuProfileDuration time.Duration, placementServiceNames []string, + serviceOptionDefaults []handler.ServiceOptionsDefault, ) (*Handler, error) { r := mux.NewRouter() @@ -149,6 +151,7 @@ func NewHandler( instrumentOpts: instrumentOpts, cpuProfileDuration: cpuProfileDuration, placementServiceNames: placementServiceNames, + serviceOptionDefaults: serviceOptionDefaults, }, nil } @@ -278,13 +281,21 @@ func (h *Handler) RegisterRoutes() error { return err } + var placementServices []handler.ServiceNameAndDefaults + for _, serviceName := range h.placementServiceNames { + service := handler.ServiceNameAndDefaults{ + ServiceName: serviceName, + Defaults: h.serviceOptionDefaults, + } + placementServices = append(placementServices, service) + } + debugWriter, err := xdebug.NewPlacementAndNamespaceZipWriterWithDefaultSources( h.cpuProfileDuration, - h.instrumentOpts, h.clusterClient, placementOpts, - h.placementServiceNames, - ) + placementServices, + h.instrumentOpts) if err != nil { return fmt.Errorf("unable to create debug writer: %v", err) } @@ -294,13 +305,13 @@ func (h *Handler) RegisterRoutes() error { wrapped(debugWriter.HTTPHandler()).ServeHTTP) err = database.RegisterRoutes(h.router, h.clusterClient, - h.config, h.embeddedDbCfg, h.instrumentOpts) + h.config, h.embeddedDbCfg, h.serviceOptionDefaults, h.instrumentOpts) if err != nil { return err } - placement.RegisterRoutes(h.router, placementOpts) - namespace.RegisterRoutes(h.router, h.clusterClient, h.instrumentOpts) + placement.RegisterRoutes(h.router, h.serviceOptionDefaults, placementOpts) + namespace.RegisterRoutes(h.router, h.clusterClient, h.serviceOptionDefaults, h.instrumentOpts) topic.RegisterRoutes(h.router, h.clusterClient, h.config, h.instrumentOpts) } diff --git a/src/query/api/v1/httpd/handler_test.go b/src/query/api/v1/httpd/handler_test.go index 7778d285fd..f08430484b 100644 --- a/src/query/api/v1/httpd/handler_test.go +++ b/src/query/api/v1/httpd/handler_test.go @@ -57,6 +57,11 @@ var ( defaultLookbackDuration = time.Minute defaultCPUProfileduration = 5 * time.Second defaultPlacementServices = []string{"m3db"} + svcDefaultOptions = []handler.ServiceOptionsDefault{ + func(o handler.ServiceOptions) handler.ServiceOptions { + return o + }, + } ) func makeTagOptions() models.TagOptions { @@ -95,7 +100,8 @@ func setupHandler(store storage.Storage) (*Handler, error) { models.QueryContextOptions{}, instrumentOpts, defaultCPUProfileduration, - defaultPlacementServices) + defaultPlacementServices, + svcDefaultOptions) } func TestHandlerFetchTimeoutError(t *testing.T) { @@ -107,9 +113,21 @@ func TestHandlerFetchTimeoutError(t *testing.T) { dbconfig := &dbconfig.DBConfiguration{Client: client.Configuration{FetchTimeout: &negValue}} engine := newEngine(storage, time.Minute, nil, instrument.NewOptions()) cfg := config.Configuration{LookbackDuration: &defaultLookbackDuration} - _, err := NewHandler(downsamplerAndWriter, makeTagOptions(), engine, nil, nil, - cfg, dbconfig, nil, handler.NewFetchOptionsBuilder(handler.FetchOptionsBuilderOptions{}), - models.QueryContextOptions{}, instrument.NewOptions(), defaultCPUProfileduration, defaultPlacementServices) + _, err := NewHandler( + downsamplerAndWriter, + makeTagOptions(), + engine, + nil, + nil, + cfg, + dbconfig, + nil, + handler.NewFetchOptionsBuilder(handler.FetchOptionsBuilderOptions{}), + models.QueryContextOptions{}, + instrument.NewOptions(), + defaultCPUProfileduration, + defaultPlacementServices, + svcDefaultOptions) require.Error(t, err) } @@ -123,9 +141,21 @@ func TestHandlerFetchTimeout(t *testing.T) { dbconfig := &dbconfig.DBConfiguration{Client: client.Configuration{FetchTimeout: &fourMin}} engine := newEngine(storage, time.Minute, nil, instrument.NewOptions()) cfg := config.Configuration{LookbackDuration: &defaultLookbackDuration} - h, err := NewHandler(downsamplerAndWriter, makeTagOptions(), engine, - nil, nil, cfg, dbconfig, nil, handler.NewFetchOptionsBuilder(handler.FetchOptionsBuilderOptions{}), - models.QueryContextOptions{}, instrument.NewOptions(), defaultCPUProfileduration, defaultPlacementServices) + h, err := NewHandler( + downsamplerAndWriter, + makeTagOptions(), + engine, + nil, + nil, + cfg, + dbconfig, + nil, + handler.NewFetchOptionsBuilder(handler.FetchOptionsBuilderOptions{}), + models.QueryContextOptions{}, + instrument.NewOptions(), + defaultCPUProfileduration, + defaultPlacementServices, + svcDefaultOptions) require.NoError(t, err) assert.Equal(t, 4*time.Minute, h.timeoutOpts.FetchTimeout) } diff --git a/src/query/server/server.go b/src/query/server/server.go index cfc5151b22..1ddde0f678 100644 --- a/src/query/server/server.go +++ b/src/query/server/server.go @@ -73,9 +73,8 @@ import ( ) const ( - serviceName = "m3query" - cpuProfileDuration = 5 * time.Second - defaultM3DBServiceName = "m3db" + serviceName = "m3query" + cpuProfileDuration = 5 * time.Second ) var ( @@ -290,10 +289,25 @@ func Run(runOpts RunOptions) { logger.Fatal("unable to create new downsampler and writer", zap.Error(err)) } + var serviceOptionDefaults []handler.ServiceOptionsDefault + if dbCfg := runOpts.DBConfig; dbCfg != nil { + cluster, err := dbCfg.EnvironmentConfig.Services.SyncCluster() + if err != nil { + logger.Fatal("could not resolve embedded db cluster info", + zap.Error(err)) + } + if svcCfg := cluster.Service; svcCfg != nil { + serviceOptionDefaults = append(serviceOptionDefaults, + handler.WithDefaultServiceEnvironment(svcCfg.Env)) + serviceOptionDefaults = append(serviceOptionDefaults, + handler.WithDefaultServiceZone(svcCfg.Zone)) + } + } + handler, err := httpd.NewHandler(downsamplerAndWriter, tagOptions, engine, m3dbClusters, clusterClient, cfg, runOpts.DBConfig, perQueryEnforcer, fetchOptsBuilder, queryCtxOpts, instrumentOptions, cpuProfileDuration, - []string{defaultM3DBServiceName}) + []string{handler.M3DBServiceName}, serviceOptionDefaults) if err != nil { logger.Fatal("unable to set up handlers", zap.Error(err)) } diff --git a/src/x/debug/cpu.go b/src/x/debug/cpu.go index 9d6b4224c1..1f534fb4c9 100644 --- a/src/x/debug/cpu.go +++ b/src/x/debug/cpu.go @@ -22,6 +22,7 @@ package debug import ( "io" + "net/http" "runtime/pprof" "time" ) @@ -36,7 +37,7 @@ func NewCPUProfileSource(h time.Duration) Source { return &cpuProfileSource{duration: h} } -func (c *cpuProfileSource) Write(w io.Writer) error { +func (c *cpuProfileSource) Write(w io.Writer, _ *http.Request) error { if err := pprof.StartCPUProfile(w); err != nil { return err } diff --git a/src/x/debug/cpu_test.go b/src/x/debug/cpu_test.go index 9234bb2290..cef09fa2f6 100644 --- a/src/x/debug/cpu_test.go +++ b/src/x/debug/cpu_test.go @@ -22,6 +22,7 @@ package debug import ( "bytes" + "net/http" "testing" "time" @@ -32,6 +33,6 @@ func TestCPUProfileSource(t *testing.T) { h := time.Duration(1) * time.Second c := NewCPUProfileSource(h) buff := bytes.NewBuffer([]byte{}) - c.Write(buff) + c.Write(buff, &http.Request{}) require.NotZero(t, buff.Len()) } diff --git a/src/x/debug/debug.go b/src/x/debug/debug.go index 73fbded236..9b4d0e6e03 100644 --- a/src/x/debug/debug.go +++ b/src/x/debug/debug.go @@ -28,6 +28,8 @@ import ( "net/http" "time" + "github.com/m3db/m3/src/query/api/v1/handler" + clusterclient "github.com/m3db/m3/src/cluster/client" "github.com/m3db/m3/src/query/api/v1/handler/placement" "github.com/m3db/m3/src/x/instrument" @@ -46,20 +48,20 @@ const ( // file for that source into the overall debug zip file. type Source interface { // Write writes it's debug information into the provided writer. - Write(w io.Writer) error + Write(w io.Writer, r *http.Request) error } // ZipWriter aggregates sources and writes them in a zip file. type ZipWriter interface { // WriteZip writes a ZIP file in the provided writer. // The archive contains the dumps of all sources in separate files. - WriteZip(io.Writer) error + WriteZip(io.Writer, *http.Request) error // RegisterSource adds a new source to the produced archive. - RegisterSource(string, Source) error + RegisterSource(fileName string, source Source) error // HTTPHandler sends out the ZIP file as raw bytes. HTTPHandler() http.Handler // RegisterHandler wires the HTTPHandlerFunc with the given router. - RegisterHandler(string, *http.ServeMux) error + RegisterHandler(handlerPath string, router *http.ServeMux) error } type zipWriter struct { @@ -80,28 +82,32 @@ func NewZipWriter(iopts instrument.Options) ZipWriter { // debug sources already registered: CPU, heap, host, goroutines, namespace and placement info. func NewPlacementAndNamespaceZipWriterWithDefaultSources( cpuProfileDuration time.Duration, - iopts instrument.Options, clusterClient clusterclient.Client, placementsOpts placement.HandlerOptions, - serviceNames []string, + services []handler.ServiceNameAndDefaults, + instrumentOpts instrument.Options, ) (ZipWriter, error) { - zw, err := NewZipWriterWithDefaultSources(cpuProfileDuration, iopts) + zw, err := NewZipWriterWithDefaultSources(cpuProfileDuration, + instrumentOpts) if err != nil { return nil, err } if clusterClient != nil { - err = zw.RegisterSource("namespaceSource", NewNamespaceInfoSource(iopts, clusterClient)) + err = zw.RegisterSource("namespace.json", + NewNamespaceInfoSource(clusterClient, instrumentOpts)) if err != nil { return nil, fmt.Errorf("unable to register namespaceSource: %s", err) } - for _, serviceName := range serviceNames { - placementInfoSource, err := NewPlacementInfoSource(iopts, placementsOpts, serviceName) + for _, service := range services { + placementInfoSource, err := NewPlacementInfoSource(service, + placementsOpts, instrumentOpts) if err != nil { return nil, fmt.Errorf("unable to create placementInfoSource: %v", err) } - err = zw.RegisterSource("placementSource", placementInfoSource) + fileName := fmt.Sprintf("placement-%s.json", service.ServiceName) + err = zw.RegisterSource(fileName, placementInfoSource) if err != nil { return nil, fmt.Errorf("unable to register placementSource: %s", err) } @@ -119,17 +125,17 @@ func NewZipWriterWithDefaultSources( ) (ZipWriter, error) { zw := NewZipWriter(iopts) - err := zw.RegisterSource("cpuSource", NewCPUProfileSource(cpuProfileDuration)) + err := zw.RegisterSource("cpu.prof", NewCPUProfileSource(cpuProfileDuration)) if err != nil { return nil, fmt.Errorf("unable to register CPUProfileSource: %s", err) } - err = zw.RegisterSource("heapSource", NewHeapDumpSource()) + err = zw.RegisterSource("heap.prof", NewHeapDumpSource()) if err != nil { return nil, fmt.Errorf("unable to register HeapDumpSource: %s", err) } - err = zw.RegisterSource("hostSource", NewHostInfoSource()) + err = zw.RegisterSource("host.json", NewHostInfoSource()) if err != nil { return nil, fmt.Errorf("unable to register HostInfoSource: %s", err) } @@ -139,7 +145,7 @@ func NewZipWriterWithDefaultSources( return nil, fmt.Errorf("unable to create goroutineProfileSource: %s", err) } - err = zw.RegisterSource("goroutineProfile", gp) + err = zw.RegisterSource("goroutine.prof", gp) return zw, nil } @@ -155,7 +161,7 @@ func (i *zipWriter) RegisterSource(dumpFileName string, p Source) error { // WriteZip writes a ZIP file with the data from all sources in the given writer. // It will return an error if any of the sources fail to write their data. -func (i *zipWriter) WriteZip(w io.Writer) error { +func (i *zipWriter) WriteZip(w io.Writer, r *http.Request) error { zw := zip.NewWriter(w) defer zw.Close() @@ -164,7 +170,7 @@ func (i *zipWriter) WriteZip(w io.Writer) error { if err != nil { return err } - err = p.Write(fw) + err = p.Write(fw, r) if err != nil { return err } @@ -175,7 +181,7 @@ func (i *zipWriter) WriteZip(w io.Writer) error { func (i *zipWriter) HTTPHandler() http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { buf := bytes.NewBuffer([]byte{}) - if err := i.WriteZip(buf); err != nil { + if err := i.WriteZip(buf, r); err != nil { xhttp.Error(w, fmt.Errorf("unable to write ZIP file: %s", err), http.StatusInternalServerError) return } @@ -188,6 +194,5 @@ func (i *zipWriter) HTTPHandler() http.Handler { func (i *zipWriter) RegisterHandler(path string, r *http.ServeMux) error { r.Handle(path, i.HTTPHandler()) - return nil } diff --git a/src/x/debug/debug_test.go b/src/x/debug/debug_test.go index 9407733bb5..f5afbb977f 100644 --- a/src/x/debug/debug_test.go +++ b/src/x/debug/debug_test.go @@ -39,6 +39,7 @@ import ( clusterplacement "github.com/m3db/m3/src/cluster/placement" "github.com/m3db/m3/src/cluster/services" "github.com/m3db/m3/src/cmd/services/m3query/config" + apihandler "github.com/m3db/m3/src/query/api/v1/handler" "github.com/m3db/m3/src/query/api/v1/handler/namespace" "github.com/m3db/m3/src/query/api/v1/handler/placement" "github.com/m3db/m3/src/x/instrument" @@ -63,7 +64,7 @@ type fakeSource struct { content string } -func (f *fakeSource) Write(w io.Writer) error { +func (f *fakeSource) Write(w io.Writer, _ *http.Request) error { f.called = true if f.shouldErr { return errors.New("bad write") @@ -87,7 +88,7 @@ func TestWriteZip(t *testing.T) { zipWriter.RegisterSource("test2", fs2) zipWriter.RegisterSource("test3", fs3) buff := bytes.NewBuffer([]byte{}) - err := zipWriter.WriteZip(buff) + err := zipWriter.WriteZip(buff, &http.Request{}) bytesReader := bytes.NewReader(buff.Bytes()) readerCloser, zerr := zip.NewReader(bytesReader, int64(len(buff.Bytes()))) @@ -125,7 +126,7 @@ func TestWriteZipErr(t *testing.T) { } zipWriter.RegisterSource("test", fs) buff := bytes.NewBuffer([]byte{}) - err := zipWriter.WriteZip(buff) + err := zipWriter.WriteZip(buff, &http.Request{}) require.Error(t, err) require.True(t, fs.called) } @@ -273,17 +274,19 @@ func newHandlerOptsAndClient(t *testing.T) (placement.HandlerOptions, *clustercl func TestDefaultSources(t *testing.T) { defaultSources := []string{ - "cpuSource", - "heapSource", - "hostSource", - "goroutineProfile", - "namespaceSource", - "placementSource", + "cpu.prof", + "heap.prof", + "host.json", + "goroutine.prof", + "namespace.json", + "placement-m3db.json", } handlerOpts, mockClient := newHandlerOptsAndClient(t) - - zw, err := NewPlacementAndNamespaceZipWriterWithDefaultSources(1*time.Second, instrument.NewOptions(), mockClient, handlerOpts, []string{"m3db"}) + svcDefaults := []apihandler.ServiceNameAndDefaults{{ + ServiceName: "m3db", + }} + zw, err := NewPlacementAndNamespaceZipWriterWithDefaultSources(1*time.Second, mockClient, handlerOpts, svcDefaults, instrument.NewOptions()) require.NoError(t, err) require.NotNil(t, zw) @@ -292,14 +295,13 @@ func TestDefaultSources(t *testing.T) { iv := reflect.ValueOf(zw).Elem().Interface() z, ok := iv.(zipWriter) require.True(t, ok) - _, ok = z.sources[source] require.True(t, ok) } // Check writing ZIP is ok buff := bytes.NewBuffer([]byte{}) - err = zw.WriteZip(buff) + err = zw.WriteZip(buff, &http.Request{}) require.NoError(t, err) require.NotZero(t, buff.Len()) diff --git a/src/x/debug/heap.go b/src/x/debug/heap.go index 5f85784292..47041a6529 100644 --- a/src/x/debug/heap.go +++ b/src/x/debug/heap.go @@ -22,6 +22,7 @@ package debug import ( "io" + "net/http" "runtime/pprof" ) @@ -33,6 +34,6 @@ func NewHeapDumpSource() Source { } // Write writes the heapdump in the provided writer. -func (h *heapDumpSource) Write(w io.Writer) error { +func (h *heapDumpSource) Write(w io.Writer, _ *http.Request) error { return pprof.WriteHeapProfile(w) } diff --git a/src/x/debug/heap_test.go b/src/x/debug/heap_test.go index 33617c43d6..8589da594b 100644 --- a/src/x/debug/heap_test.go +++ b/src/x/debug/heap_test.go @@ -22,6 +22,7 @@ package debug import ( "bytes" + "net/http" "testing" "github.com/stretchr/testify/require" @@ -30,6 +31,6 @@ import ( func TestHeapDumpSource(t *testing.T) { heapDumpSource := NewHeapDumpSource() buff := bytes.NewBuffer([]byte{}) - heapDumpSource.Write(buff) + heapDumpSource.Write(buff, &http.Request{}) require.NotZero(t, buff.Len()) } diff --git a/src/x/debug/host.go b/src/x/debug/host.go index 113660d829..f8c6e70d90 100644 --- a/src/x/debug/host.go +++ b/src/x/debug/host.go @@ -23,6 +23,7 @@ package debug import ( "encoding/json" "io" + "net/http" "os" ) @@ -43,7 +44,7 @@ func NewHostInfoSource() Source { // Write fetches data about the host and writes it in the given writer. // The data is formatted in json. // It will return an error if it can't get working directory or marshal. -func (h *hostInfoSource) Write(w io.Writer) error { +func (h *hostInfoSource) Write(w io.Writer, _ *http.Request) error { wd, err := os.Getwd() if err != nil { return err diff --git a/src/x/debug/host_test.go b/src/x/debug/host_test.go index 1ee0c218c1..ba38f7b8ec 100644 --- a/src/x/debug/host_test.go +++ b/src/x/debug/host_test.go @@ -23,6 +23,7 @@ package debug import ( "bytes" "encoding/json" + "net/http" "testing" "github.com/stretchr/testify/require" @@ -31,7 +32,7 @@ import ( func TestHostInfoSource(t *testing.T) { source := NewHostInfoSource() buff := bytes.NewBuffer([]byte{}) - err := source.Write(buff) + err := source.Write(buff, &http.Request{}) require.NoError(t, err) err = json.Unmarshal(buff.Bytes(), &hostInfoSource{}) require.NoError(t, err) diff --git a/src/x/debug/namespace.go b/src/x/debug/namespace.go index 890830e077..df27dc9acc 100644 --- a/src/x/debug/namespace.go +++ b/src/x/debug/namespace.go @@ -24,6 +24,7 @@ import ( "bytes" "encoding/json" "io" + "net/http" clusterclient "github.com/m3db/m3/src/cluster/client" "github.com/m3db/m3/src/query/api/v1/handler/namespace" @@ -40,10 +41,11 @@ type namespaceInfoSource struct { // NewNamespaceInfoSource returns a Source for namespace information. func NewNamespaceInfoSource( - iopts instrument.Options, clusterClient clusterclient.Client, + instrumentOpts instrument.Options, ) Source { - handler := namespace.NewGetHandler(clusterClient, iopts) + handler := namespace.NewGetHandler(clusterClient, + instrumentOpts) return &namespaceInfoSource{ handler: handler, } @@ -51,7 +53,7 @@ func NewNamespaceInfoSource( // Write fetches data about the namespace and writes it in the given writer. // The data is formatted in json. -func (n *namespaceInfoSource) Write(w io.Writer) error { +func (n *namespaceInfoSource) Write(w io.Writer, _ *http.Request) error { nsRegistry, err := n.handler.Get() if err != nil { return err diff --git a/src/x/debug/namespace_test.go b/src/x/debug/namespace_test.go index f0fa93b2cd..0af409aae3 100644 --- a/src/x/debug/namespace_test.go +++ b/src/x/debug/namespace_test.go @@ -22,6 +22,7 @@ package debug import ( "bytes" + "net/http" "testing" "github.com/m3db/m3/src/x/instrument" @@ -32,9 +33,9 @@ import ( func TestNamespaceSource(t *testing.T) { _, mockClient := newHandlerOptsAndClient(t) iOpts := instrument.NewOptions() - n := NewNamespaceInfoSource(iOpts, mockClient) + n := NewNamespaceInfoSource(mockClient, iOpts) buff := bytes.NewBuffer([]byte{}) - n.Write(buff) + n.Write(buff, &http.Request{}) require.NotZero(t, buff.Len()) } diff --git a/src/x/debug/placement.go b/src/x/debug/placement.go index a4862ba075..2b5b02fc62 100644 --- a/src/x/debug/placement.go +++ b/src/x/debug/placement.go @@ -23,7 +23,9 @@ package debug import ( "fmt" "io" + "net/http" + "github.com/m3db/m3/src/query/api/v1/handler" "github.com/m3db/m3/src/query/api/v1/handler/placement" "github.com/m3db/m3/src/query/generated/proto/admin" "github.com/m3db/m3/src/x/instrument" @@ -32,32 +34,33 @@ import ( ) type placementInfoSource struct { - getHandler *placement.GetHandler - serviceName string + getHandler *placement.GetHandler + service handler.ServiceNameAndDefaults } // NewPlacementInfoSource returns a Source for placement information. func NewPlacementInfoSource( - iopts instrument.Options, + service handler.ServiceNameAndDefaults, placementOpts placement.HandlerOptions, - serviceName string, + iopts instrument.Options, ) (Source, error) { handler := placement.NewGetHandler(placementOpts) return &placementInfoSource{ - getHandler: handler, - serviceName: serviceName, + getHandler: handler, + service: service, }, nil } // Write fetches data about the placement and writes it in the given writer. // The data is formatted in json. -func (p *placementInfoSource) Write(w io.Writer) error { - placement, _, err := p.getHandler.Get(p.serviceName, nil) +func (p *placementInfoSource) Write(w io.Writer, httpReq *http.Request) error { + placement, _, err := p.getHandler.Get(p.service, httpReq) if err != nil { return err } if placement == nil { - return fmt.Errorf("placement does not exist for service: %s", p.serviceName) + return fmt.Errorf("placement does not exist for service: %s", + p.service.ServiceName) } placementProto, err := placement.Proto() diff --git a/src/x/debug/placement_test.go b/src/x/debug/placement_test.go index 19e476e380..ce121463b2 100644 --- a/src/x/debug/placement_test.go +++ b/src/x/debug/placement_test.go @@ -22,8 +22,10 @@ package debug import ( "bytes" + "net/http" "testing" + apihandler "github.com/m3db/m3/src/query/api/v1/handler" "github.com/m3db/m3/src/x/instrument" "github.com/stretchr/testify/require" @@ -32,10 +34,13 @@ import ( func TestPlacementSource(t *testing.T) { handlerOpts, _ := newHandlerOptsAndClient(t) iOpts := instrument.NewOptions() - p, err := NewPlacementInfoSource(iOpts, handlerOpts, "m3db") + svcDefaults := apihandler.ServiceNameAndDefaults{ + ServiceName: "m3db", + } + p, err := NewPlacementInfoSource(svcDefaults, handlerOpts, iOpts) require.NoError(t, err) buff := bytes.NewBuffer([]byte{}) - p.Write(buff) + p.Write(buff, &http.Request{}) require.NotZero(t, buff.Len()) } diff --git a/src/x/debug/profile.go b/src/x/debug/profile.go index a5297c2c26..6f05a97b5f 100644 --- a/src/x/debug/profile.go +++ b/src/x/debug/profile.go @@ -23,6 +23,7 @@ package debug import ( "fmt" "io" + "net/http" "runtime/pprof" ) @@ -60,7 +61,7 @@ func (p *ProfileSource) Profile() *pprof.Profile { // Write writes a pprof-formatted snapshot of the profile to w. If a write to w // returns an error, Write returns that error. Otherwise, Write returns nil. -func (p *ProfileSource) Write(w io.Writer) error { +func (p *ProfileSource) Write(w io.Writer, _ *http.Request) error { prof := p.Profile() if err := prof.WriteTo(w, p.debug); err != nil { diff --git a/src/x/debug/profile_test.go b/src/x/debug/profile_test.go index 9be8bf21e0..75ac6be2df 100644 --- a/src/x/debug/profile_test.go +++ b/src/x/debug/profile_test.go @@ -22,6 +22,7 @@ package debug import ( "bytes" + "net/http" "testing" "github.com/stretchr/testify/require" @@ -56,7 +57,7 @@ func TestProfileSources(t *testing.T) { // Test zip writes buf := bytes.NewBuffer([]byte{}) - err = goProfSource.Write(buf) + err = goProfSource.Write(buf, &http.Request{}) require.NoError(t, err) require.NotZero(t, buf.Len()) }