From 0822312308183b4b84c9b6cd5e2bf3032b142108 Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Fri, 4 Dec 2020 18:22:04 -0500 Subject: [PATCH] [coordinator] [query] Add readiness probe for probing current consistency level achievability (#2976) --- .../prometheus/test.sh | 10 + src/dbnode/client/client_mock.go | 90 ++++++++ src/dbnode/client/replicated_session.go | 8 + src/dbnode/client/session.go | 135 ++++++++--- src/dbnode/client/types.go | 6 + src/query/api/v1/handler/ready.go | 205 +++++++++++++++++ src/query/api/v1/handler/ready_test.go | 213 ++++++++++++++++++ src/query/api/v1/handler/types.go | 1 + src/query/api/v1/httpd/handler.go | 5 + src/query/storage/m3/storagemetadata/types.go | 9 + src/query/stores/m3db/async_session.go | 22 ++ 11 files changed, 673 insertions(+), 31 deletions(-) create mode 100644 src/query/api/v1/handler/ready.go create mode 100644 src/query/api/v1/handler/ready_test.go diff --git a/scripts/docker-integration-tests/prometheus/test.sh b/scripts/docker-integration-tests/prometheus/test.sh index 543c42b2a2..c1d242fb30 100755 --- a/scripts/docker-integration-tests/prometheus/test.sh +++ b/scripts/docker-integration-tests/prometheus/test.sh @@ -37,6 +37,13 @@ setup_single_m3db_node echo "Start Prometheus containers" docker-compose -f ${COMPOSE_FILE} up -d prometheus01 +function test_readiness { + # Check readiness probe eventually succeeds + echo "Check readiness probe eventually succeeds" + ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ + '[[ $(curl --write-out "%{http_code}" --silent --output /dev/null 0.0.0.0:7201/ready) -eq "200" ]]' +} + function test_prometheus_remote_read { # Ensure Prometheus can proxy a Prometheus query echo "Wait until the remote write endpoint generates and allows for data to be queried" @@ -347,6 +354,9 @@ function test_series { '[[ $(curl -s "0.0.0.0:7201/api/v1/series?match[]=prometheus_remote_storage_succeeded_samples_total&start=-292273086-05-16T16:47:06Z&end=292277025-08-18T07:12:54.999999999Z" | jq -r ".data | length") -eq 1 ]]' } +echo "Running readiness test" +test_readiness + echo "Running prometheus tests" test_prometheus_remote_read test_prometheus_remote_write_multi_namespaces diff --git a/src/dbnode/client/client_mock.go b/src/dbnode/client/client_mock.go index 541df13d09..acb2823680 100644 --- a/src/dbnode/client/client_mock.go +++ b/src/dbnode/client/client_mock.go @@ -155,6 +155,36 @@ func (m *MockSession) EXPECT() *MockSessionMockRecorder { return m.recorder } +// WriteClusterAvailability mocks base method +func (m *MockSession) WriteClusterAvailability() (bool, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "WriteClusterAvailability") + ret0, _ := ret[0].(bool) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// WriteClusterAvailability indicates an expected call of WriteClusterAvailability +func (mr *MockSessionMockRecorder) WriteClusterAvailability() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WriteClusterAvailability", reflect.TypeOf((*MockSession)(nil).WriteClusterAvailability)) +} + +// ReadClusterAvailability mocks base method +func (m *MockSession) ReadClusterAvailability() (bool, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ReadClusterAvailability") + ret0, _ := ret[0].(bool) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ReadClusterAvailability indicates an expected call of ReadClusterAvailability +func (mr *MockSessionMockRecorder) ReadClusterAvailability() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReadClusterAvailability", reflect.TypeOf((*MockSession)(nil).ReadClusterAvailability)) +} + // Write mocks base method func (m *MockSession) Write(namespace, id ident.ID, t time.Time, value float64, unit time0.Unit, annotation []byte) error { m.ctrl.T.Helper() @@ -757,6 +787,36 @@ func (m *MockAdminSession) EXPECT() *MockAdminSessionMockRecorder { return m.recorder } +// WriteClusterAvailability mocks base method +func (m *MockAdminSession) WriteClusterAvailability() (bool, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "WriteClusterAvailability") + ret0, _ := ret[0].(bool) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// WriteClusterAvailability indicates an expected call of WriteClusterAvailability +func (mr *MockAdminSessionMockRecorder) WriteClusterAvailability() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WriteClusterAvailability", reflect.TypeOf((*MockAdminSession)(nil).WriteClusterAvailability)) +} + +// ReadClusterAvailability mocks base method +func (m *MockAdminSession) ReadClusterAvailability() (bool, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ReadClusterAvailability") + ret0, _ := ret[0].(bool) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ReadClusterAvailability indicates an expected call of ReadClusterAvailability +func (mr *MockAdminSessionMockRecorder) ReadClusterAvailability() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReadClusterAvailability", reflect.TypeOf((*MockAdminSession)(nil).ReadClusterAvailability)) +} + // Write mocks base method func (m *MockAdminSession) Write(namespace, id ident.ID, t time.Time, value float64, unit time0.Unit, annotation []byte) error { m.ctrl.T.Helper() @@ -4398,6 +4458,36 @@ func (m *MockclientSession) EXPECT() *MockclientSessionMockRecorder { return m.recorder } +// WriteClusterAvailability mocks base method +func (m *MockclientSession) WriteClusterAvailability() (bool, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "WriteClusterAvailability") + ret0, _ := ret[0].(bool) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// WriteClusterAvailability indicates an expected call of WriteClusterAvailability +func (mr *MockclientSessionMockRecorder) WriteClusterAvailability() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WriteClusterAvailability", reflect.TypeOf((*MockclientSession)(nil).WriteClusterAvailability)) +} + +// ReadClusterAvailability mocks base method +func (m *MockclientSession) ReadClusterAvailability() (bool, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ReadClusterAvailability") + ret0, _ := ret[0].(bool) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ReadClusterAvailability indicates an expected call of ReadClusterAvailability +func (mr *MockclientSessionMockRecorder) ReadClusterAvailability() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReadClusterAvailability", reflect.TypeOf((*MockclientSession)(nil).ReadClusterAvailability)) +} + // Write mocks base method func (m *MockclientSession) Write(namespace, id ident.ID, t time.Time, value float64, unit time0.Unit, annotation []byte) error { m.ctrl.T.Helper() diff --git a/src/dbnode/client/replicated_session.go b/src/dbnode/client/replicated_session.go index 5b2f95191e..8064a5df8e 100644 --- a/src/dbnode/client/replicated_session.go +++ b/src/dbnode/client/replicated_session.go @@ -190,6 +190,14 @@ func (s replicatedSession) replicate(params replicatedParams) error { return s.session.Write(params.namespace, params.id, params.t, params.value, params.unit, params.annotation) } +func (s *replicatedSession) ReadClusterAvailability() (bool, error) { + return s.session.ReadClusterAvailability() +} + +func (s *replicatedSession) WriteClusterAvailability() (bool, error) { + return s.session.WriteClusterAvailability() +} + // Write value to the database for an ID. func (s replicatedSession) Write(namespace, id ident.ID, t time.Time, value float64, unit xtime.Unit, annotation []byte) error { return s.replicate(replicatedParams{ diff --git a/src/dbnode/client/session.go b/src/dbnode/client/session.go index c6cd514ea5..51103f5c3d 100644 --- a/src/dbnode/client/session.go +++ b/src/dbnode/client/session.go @@ -683,8 +683,6 @@ func (s *session) hostQueues( newQueues = append(newQueues, newQueue) } - shards := topoMap.ShardSet().AllIDs() - minConnectionCount := s.opts.MinConnectionCount() replicas := topoMap.Replicas() majority := topoMap.MajorityReplicas() @@ -731,35 +729,25 @@ func (s *session) hostQueues( return nil, 0, 0, ErrClusterConnectTimeout } } - // Be optimistic - clusterAvailable := true - for _, shardID := range shards { - shardReplicasAvailable := 0 - routeErr := topoMap.RouteShardForEach(shardID, func(idx int, _ shard.Shard, _ topology.Host) { - if queues[idx].ConnectionCount() >= minConnectionCount { - shardReplicasAvailable++ - } - }) - if routeErr != nil { - return nil, 0, 0, routeErr - } - var clusterAvailableForShard bool - switch connectConsistencyLevel { - case topology.ConnectConsistencyLevelAll: - clusterAvailableForShard = shardReplicasAvailable == replicas - case topology.ConnectConsistencyLevelMajority: - clusterAvailableForShard = shardReplicasAvailable >= majority - case topology.ConnectConsistencyLevelOne: - clusterAvailableForShard = shardReplicasAvailable > 0 - default: - return nil, 0, 0, errSessionInvalidConnectClusterConnectConsistencyLevel - } - if !clusterAvailableForShard { - clusterAvailable = false - break - } + + var level topology.ConsistencyLevel + switch connectConsistencyLevel { + case topology.ConnectConsistencyLevelAll: + level = topology.ConsistencyLevelAll + case topology.ConnectConsistencyLevelMajority: + level = topology.ConsistencyLevelMajority + case topology.ConnectConsistencyLevelOne: + level = topology.ConsistencyLevelOne + default: + return nil, 0, 0, errSessionInvalidConnectClusterConnectConsistencyLevel } - if clusterAvailable { // All done + clusterAvailable, err := s.clusterAvailabilityWithQueuesAndMap(level, + queues, topoMap) + if err != nil { + return nil, 0, 0, err + } + if clusterAvailable { + // All done break } time.Sleep(clusterConnectWaitInterval) @@ -769,6 +757,86 @@ func (s *session) hostQueues( return queues, replicas, majority, nil } +func (s *session) WriteClusterAvailability() (bool, error) { + level := s.opts.WriteConsistencyLevel() + return s.clusterAvailability(level) +} + +func (s *session) ReadClusterAvailability() (bool, error) { + var convertedConsistencyLevel topology.ConsistencyLevel + level := s.opts.ReadConsistencyLevel() + switch level { + case topology.ReadConsistencyLevelNone: + // Already ready. + return true, nil + case topology.ReadConsistencyLevelOne: + convertedConsistencyLevel = topology.ConsistencyLevelOne + case topology.ReadConsistencyLevelUnstrictMajority: + convertedConsistencyLevel = topology.ConsistencyLevelOne + case topology.ReadConsistencyLevelMajority: + convertedConsistencyLevel = topology.ConsistencyLevelMajority + case topology.ReadConsistencyLevelUnstrictAll: + convertedConsistencyLevel = topology.ConsistencyLevelOne + case topology.ReadConsistencyLevelAll: + convertedConsistencyLevel = topology.ConsistencyLevelAll + default: + return false, fmt.Errorf("unknown consistency level: %d", level) + } + return s.clusterAvailability(convertedConsistencyLevel) +} + +func (s *session) clusterAvailability( + level topology.ConsistencyLevel, +) (bool, error) { + s.state.RLock() + queues := s.state.queues + topoMap, err := s.topologyMapWithStateRLock() + s.state.RUnlock() + if err != nil { + return false, err + } + return s.clusterAvailabilityWithQueuesAndMap(level, queues, topoMap) +} + +func (s *session) clusterAvailabilityWithQueuesAndMap( + level topology.ConsistencyLevel, + queues []hostQueue, + topoMap topology.Map, +) (bool, error) { + shards := topoMap.ShardSet().AllIDs() + minConnectionCount := s.opts.MinConnectionCount() + replicas := topoMap.Replicas() + majority := topoMap.MajorityReplicas() + + for _, shardID := range shards { + shardReplicasAvailable := 0 + routeErr := topoMap.RouteShardForEach(shardID, func(idx int, _ shard.Shard, _ topology.Host) { + if queues[idx].ConnectionCount() >= minConnectionCount { + shardReplicasAvailable++ + } + }) + if routeErr != nil { + return false, routeErr + } + var clusterAvailableForShard bool + switch level { + case topology.ConsistencyLevelAll: + clusterAvailableForShard = shardReplicasAvailable == replicas + case topology.ConsistencyLevelMajority: + clusterAvailableForShard = shardReplicasAvailable >= majority + case topology.ConsistencyLevelOne: + clusterAvailableForShard = shardReplicasAvailable > 0 + default: + return false, fmt.Errorf("unknown consistency level: %d", level) + } + if !clusterAvailableForShard { + return false, nil + } + } + + return true, nil +} + func (s *session) setTopologyWithLock(topoMap topology.Map, queues []hostQueue, replicas, majority int) { prevQueues := s.state.queues @@ -1879,9 +1947,14 @@ func (s *session) Replicas() int { func (s *session) TopologyMap() (topology.Map, error) { s.state.RLock() + topoMap, err := s.topologyMapWithStateRLock() + s.state.RUnlock() + return topoMap, err +} + +func (s *session) topologyMapWithStateRLock() (topology.Map, error) { status := s.state.status topoMap := s.state.topoMap - s.state.RUnlock() // Make sure the session is open, as thats what sets the initial topology. if status != statusOpen { diff --git a/src/dbnode/client/types.go b/src/dbnode/client/types.go index 3d4040bd7a..90dbb24efe 100644 --- a/src/dbnode/client/types.go +++ b/src/dbnode/client/types.go @@ -62,6 +62,12 @@ type Client interface { // Session can write and read to a cluster. type Session interface { + // WriteClusterAvailability returns whether cluster is available for writes. + WriteClusterAvailability() (bool, error) + + // ReadClusterAvailability returns whether cluster is available for reads. + ReadClusterAvailability() (bool, error) + // Write value to the database for an ID. Write(namespace, id ident.ID, t time.Time, value float64, unit xtime.Unit, annotation []byte) error diff --git a/src/query/api/v1/handler/ready.go b/src/query/api/v1/handler/ready.go new file mode 100644 index 0000000000..4e3358bad4 --- /dev/null +++ b/src/query/api/v1/handler/ready.go @@ -0,0 +1,205 @@ +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package handler + +import ( + "context" + "encoding/json" + "errors" + "net/http" + "strconv" + + "go.uber.org/zap" + + "github.com/m3db/m3/src/query/api/v1/options" + "github.com/m3db/m3/src/query/storage/m3" + "github.com/m3db/m3/src/query/util/logging" + xerrors "github.com/m3db/m3/src/x/errors" + "github.com/m3db/m3/src/x/instrument" + xhttp "github.com/m3db/m3/src/x/net/http" +) + +const ( + // ReadyURL is the url to check for readiness. + ReadyURL = "/ready" + + // ReadyHTTPMethod is the HTTP method used with this resource. + ReadyHTTPMethod = http.MethodGet +) + +// ReadyHandler tests whether the service is connected to underlying storage. +type ReadyHandler struct { + clusters m3.Clusters + instrumentOpts instrument.Options +} + +// NewReadyHandler returns a new instance of handler. +func NewReadyHandler(opts options.HandlerOptions) http.Handler { + return &ReadyHandler{ + clusters: opts.Clusters(), + instrumentOpts: opts.InstrumentOpts(), + } +} + +type readyResultNamespace struct { + ID string `json:"id"` + Attributes readyResultNamespaceAttributes `json:"attributes"` +} + +type readyResultNamespaceAttributes struct { + MetricsType string `json:"metricsType"` + Retention string `json:"retention"` + Resolution string `json:"resolution"` +} + +type readyResult struct { + ReadyReads []readyResultNamespace `json:"readyReads,omitempty"` + NotReadyReads []readyResultNamespace `json:"notReadyReads,omitempty"` + ReadyWrites []readyResultNamespace `json:"readyWrites,omitempty"` + NotReadyWrites []readyResultNamespace `json:"notReadyWrites,omitempty"` +} + +// ServeHTTP serves HTTP handler. This comment only here so doesn't break +// lint by not being "ServeHTTP" as the comment above this function +// which needs // nolint:gocyclo. +// nolint:gocyclo +func (h *ReadyHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + logger := logging.WithContext(r.Context(), h.instrumentOpts) + + req, err := h.parseReadyRequestChecked(r) + if err != nil { + logger.Error("unable to parse ready request", zap.Error(err)) + xhttp.Error(w, err, errorStatusCode(err)) + return + } + + var ( + namespaces = h.clusters.ClusterNamespaces() + result = &readyResult{} + ) + for _, ns := range namespaces { + attrs := ns.Options().Attributes() + nsResult := readyResultNamespace{ + ID: ns.NamespaceID().String(), + Attributes: readyResultNamespaceAttributes{ + MetricsType: attrs.MetricsType.String(), + Retention: attrs.Retention.String(), + Resolution: attrs.Resolution.String(), + }, + } + + ready, err := ns.Session().ReadClusterAvailability() + if err != nil { + logger.Error("check read availability error", zap.Error(err)) + xhttp.Error(w, err, errorStatusCode(err)) + return + } + if !ready { + result.NotReadyReads = append(result.NotReadyReads, nsResult) + } else { + result.ReadyReads = append(result.ReadyReads, nsResult) + } + + ready, err = ns.Session().WriteClusterAvailability() + if err != nil { + logger.Error("check write availability error", zap.Error(err)) + xhttp.Error(w, err, errorStatusCode(err)) + return + } + if !ready { + result.NotReadyWrites = append(result.NotReadyWrites, nsResult) + } else { + result.ReadyWrites = append(result.ReadyWrites, nsResult) + } + } + + resp, err := json.Marshal(result) + if err != nil { + xhttp.Error(w, err, errorStatusCode(err)) + return + } + + if n := len(result.NotReadyReads); req.reads && n > 0 { + w.WriteHeader(http.StatusInternalServerError) + w.Write(resp) + return + } + + if n := len(result.NotReadyWrites); req.writes && n > 0 { + w.WriteHeader(http.StatusInternalServerError) + w.Write(resp) + return + } + + xhttp.WriteJSONResponse(w, result, logger) +} + +type readyRequest struct { + reads bool + writes bool +} + +func (h *ReadyHandler) parseReadyRequestChecked(r *http.Request) (readyRequest, error) { + result, err := h.parseReadyRequest(r) + if err != nil { + // All request parsing errors should be treated as invalid params err. + return readyRequest{}, xerrors.NewInvalidParamsError(err) + } + return result, nil +} + +func (h *ReadyHandler) parseReadyRequest(r *http.Request) (readyRequest, error) { + // Default to checking for both read and write availability. + var ( + req = readyRequest{ + reads: true, + writes: true, + } + err error + ) + if str := r.URL.Query().Get("reads"); str != "" { + req.reads, err = strconv.ParseBool(str) + if err != nil { + return readyRequest{}, xerrors.NewInvalidParamsError(err) + } + } + if str := r.URL.Query().Get("writes"); str != "" { + req.writes, err = strconv.ParseBool(str) + if err != nil { + return readyRequest{}, xerrors.NewInvalidParamsError(err) + } + } + return req, nil +} + +func errorStatusCode(err error) int { + switch v := err.(type) { + case *xhttp.ParseError: + return v.Code() + case error: + if xerrors.IsInvalidParams(v) { + return http.StatusBadRequest + } else if errors.Is(err, context.DeadlineExceeded) { + return http.StatusGatewayTimeout + } + } + return http.StatusInternalServerError +} diff --git a/src/query/api/v1/handler/ready_test.go b/src/query/api/v1/handler/ready_test.go new file mode 100644 index 0000000000..b616a78ae0 --- /dev/null +++ b/src/query/api/v1/handler/ready_test.go @@ -0,0 +1,213 @@ +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package handler + +import ( + "fmt" + "io/ioutil" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/m3db/m3/src/dbnode/client" + "github.com/m3db/m3/src/query/api/v1/options" + "github.com/m3db/m3/src/query/storage/m3" + "github.com/m3db/m3/src/x/ident" + xtest "github.com/m3db/m3/src/x/test" +) + +func TestReadyHandler(t *testing.T) { + tests := []struct { + name string + prepare func(session *client.MockSession) + queryString string + expectedStatusCode int + expectedResponse string + }{ + { + name: "healthy", + prepare: func(session *client.MockSession) { + session.EXPECT().ReadClusterAvailability().Return(true, nil) + session.EXPECT().WriteClusterAvailability().Return(true, nil) + }, + expectedStatusCode: http.StatusOK, + expectedResponse: `{ + "readyReads": [ + { + "attributes": { + "metricsType": "unaggregated", + "resolution": "0s", + "retention": "24h0m0s" + }, + "id": "test-ns" + } + ], + "readyWrites": [ + { + "attributes": { + "metricsType": "unaggregated", + "resolution": "0s", + "retention": "24h0m0s" + }, + "id": "test-ns" + } + ] + }`, + }, + { + name: "unhealthy", + prepare: func(session *client.MockSession) { + session.EXPECT().ReadClusterAvailability().Return(true, nil) + session.EXPECT().WriteClusterAvailability().Return(false, nil) + }, + expectedStatusCode: http.StatusInternalServerError, + expectedResponse: `{ + "readyReads": [ + { + "attributes": { + "metricsType": "unaggregated", + "resolution": "0s", + "retention": "24h0m0s" + }, + "id": "test-ns" + } + ], + "notReadyWrites": [ + { + "attributes": { + "metricsType": "unaggregated", + "resolution": "0s", + "retention": "24h0m0s" + }, + "id": "test-ns" + } + ] + }`, + }, + { + name: "healthy only reads", + prepare: func(session *client.MockSession) { + session.EXPECT().ReadClusterAvailability().Return(true, nil) + session.EXPECT().WriteClusterAvailability().Return(false, nil) + }, + queryString: "writes=false", + expectedStatusCode: http.StatusOK, + expectedResponse: `{ + "readyReads": [ + { + "attributes": { + "metricsType": "unaggregated", + "resolution": "0s", + "retention": "24h0m0s" + }, + "id": "test-ns" + } + ], + "notReadyWrites": [ + { + "attributes": { + "metricsType": "unaggregated", + "resolution": "0s", + "retention": "24h0m0s" + }, + "id": "test-ns" + } + ] + }`, + }, + { + name: "healthy only writes", + prepare: func(session *client.MockSession) { + session.EXPECT().ReadClusterAvailability().Return(false, nil) + session.EXPECT().WriteClusterAvailability().Return(true, nil) + }, + queryString: "reads=false", + expectedStatusCode: http.StatusOK, + expectedResponse: `{ + "notReadyReads": [ + { + "attributes": { + "metricsType": "unaggregated", + "resolution": "0s", + "retention": "24h0m0s" + }, + "id": "test-ns" + } + ], + "readyWrites": [ + { + "attributes": { + "metricsType": "unaggregated", + "resolution": "0s", + "retention": "24h0m0s" + }, + "id": "test-ns" + } + ] + }`, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + ctrl := xtest.NewController(t) + defer ctrl.Finish() + + session := client.NewMockSession(ctrl) + + test.prepare(session) + + clusters, err := m3.NewClusters(m3.UnaggregatedClusterNamespaceDefinition{ + NamespaceID: ident.StringID("test-ns"), + Session: session, + Retention: 24 * time.Hour, + }) + require.NoError(t, err) + + opts := options.EmptyHandlerOptions().SetClusters(clusters) + readyHandler := NewReadyHandler(opts) + + w := httptest.NewRecorder() + url := ReadyURL + if test.queryString != "" { + url += fmt.Sprintf("?%s", test.queryString) + } + req := httptest.NewRequest(ReadyHTTPMethod, url, nil) + + readyHandler.ServeHTTP(w, req) + + resp := w.Result() + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + assert.NoError(t, err) + assert.Equal(t, test.expectedStatusCode, resp.StatusCode) + + expected := xtest.MustPrettyJSONString(t, test.expectedResponse) + actual := xtest.MustPrettyJSONString(t, string(body)) + + assert.Equal(t, expected, actual, xtest.Diff(expected, actual)) + }) + } +} diff --git a/src/query/api/v1/handler/types.go b/src/query/api/v1/handler/types.go index 54a6a605ae..d7828362c0 100644 --- a/src/query/api/v1/handler/types.go +++ b/src/query/api/v1/handler/types.go @@ -18,6 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. +// Package handler contains root level handlers. package handler // HeaderKeyType is the type for the header key. diff --git a/src/query/api/v1/httpd/handler.go b/src/query/api/v1/httpd/handler.go index eb115b2645..811827562e 100644 --- a/src/query/api/v1/httpd/handler.go +++ b/src/query/api/v1/httpd/handler.go @@ -226,6 +226,11 @@ func (h *Handler) RegisterRoutes() error { wrapped(m3json.NewWriteJSONHandler(h.options)).ServeHTTP, ).Methods(m3json.JSONWriteHTTPMethod) + // Readiness endpoint. + h.router.HandleFunc(handler.ReadyURL, + wrapped(handler.NewReadyHandler(h.options)).ServeHTTP, + ).Methods(handler.ReadyHTTPMethod) + // Tag completion endpoints. h.router.HandleFunc(native.CompleteTagsURL, wrapped(native.NewCompleteTagsHandler(h.options)).ServeHTTP, diff --git a/src/query/storage/m3/storagemetadata/types.go b/src/query/storage/m3/storagemetadata/types.go index 563f04ead2..9530db6275 100644 --- a/src/query/storage/m3/storagemetadata/types.go +++ b/src/query/storage/m3/storagemetadata/types.go @@ -21,6 +21,7 @@ package storagemetadata import ( + "fmt" "time" ) @@ -55,3 +56,11 @@ type Attributes struct { func (a Attributes) Validate() error { return ValidateMetricsType(a.MetricsType) } + +// String returns a string detailing the attributes. +func (a Attributes) String() string { + return fmt.Sprintf("type=%s, retention=%s, resolution=%s", + a.MetricsType.String(), + a.Retention.String(), + a.Resolution.String()) +} diff --git a/src/query/stores/m3db/async_session.go b/src/query/stores/m3db/async_session.go index 0e3aef6460..7203695697 100644 --- a/src/query/stores/m3db/async_session.go +++ b/src/query/stores/m3db/async_session.go @@ -93,6 +93,28 @@ func NewAsyncSession(fn NewClientFn, done chan<- struct{}) *AsyncSession { return asyncSession } +// ReadClusterAvailability returns whether the cluster is availabile for reads. +func (s *AsyncSession) ReadClusterAvailability() (bool, error) { + s.RLock() + defer s.RUnlock() + if s.err != nil { + return false, s.err + } + + return s.session.ReadClusterAvailability() +} + +// WriteClusterAvailability returns whether the cluster is availabile for writes. +func (s *AsyncSession) WriteClusterAvailability() (bool, error) { + s.RLock() + defer s.RUnlock() + if s.err != nil { + return false, s.err + } + + return s.session.WriteClusterAvailability() +} + // Write writes a value to the database for an ID. func (s *AsyncSession) Write(namespace, id ident.ID, t time.Time, value float64, unit xtime.Unit, annotation []byte) error {