Skip to content

Commit

Permalink
Add remoteAddr to remote and json write endpoints (#1583)
Browse files Browse the repository at this point in the history
  • Loading branch information
robskillington authored Apr 24, 2019
1 parent 00625a4 commit 5451f5f
Show file tree
Hide file tree
Showing 25 changed files with 134 additions and 52 deletions.
6 changes: 3 additions & 3 deletions src/query/api/v1/handler/database/config_bootstrappers_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (h *configGetBootstrappersHandler) ServeHTTP(w http.ResponseWriter, r *http

store, err := h.client.KV()
if err != nil {
logger.Error("unable to get kv store", zap.Any("error", err))
logger.Error("unable to get kv store", zap.Error(err))
xhttp.Error(w, err, http.StatusInternalServerError)
return
}
Expand All @@ -72,14 +72,14 @@ func (h *configGetBootstrappersHandler) ServeHTTP(w http.ResponseWriter, r *http
return
}
if err != nil {
logger.Error("unable to get kv key", zap.Any("error", err))
logger.Error("unable to get kv key", zap.Error(err))
xhttp.Error(w, err, http.StatusInternalServerError)
return
}

array := new(commonpb.StringArrayProto)
if err := value.Unmarshal(array); err != nil {
logger.Error("unable to unmarshal kv key", zap.Any("error", err))
logger.Error("unable to unmarshal kv key", zap.Error(err))
xhttp.Error(w, err, http.StatusInternalServerError)
return
}
Expand Down
6 changes: 3 additions & 3 deletions src/query/api/v1/handler/database/config_bootstrappers_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,20 +63,20 @@ func (h *configSetBootstrappersHandler) ServeHTTP(w http.ResponseWriter, r *http

value, rErr := h.parseRequest(r)
if rErr != nil {
logger.Error("unable to parse request", zap.Any("error", rErr))
logger.Error("unable to parse request", zap.Error(rErr))
xhttp.Error(w, rErr.Inner(), rErr.Code())
return
}

store, err := h.client.KV()
if err != nil {
logger.Error("unable to get kv store", zap.Any("error", err))
logger.Error("unable to get kv store", zap.Error(err))
xhttp.Error(w, err, http.StatusInternalServerError)
return
}

if _, err := store.Set(kvconfig.BootstrapperKey, value); err != nil {
logger.Error("unable to set kv key", zap.Any("error", err))
logger.Error("unable to set kv key", zap.Error(err))
xhttp.Error(w, err, http.StatusInternalServerError)
return
}
Expand Down
4 changes: 2 additions & 2 deletions src/query/api/v1/handler/database/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (h *createHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {

parsedReq, namespaceRequest, placementRequest, rErr := h.parseAndValidateRequest(r, currPlacement)
if rErr != nil {
logger.Error("unable to parse request", zap.Any("error", rErr))
logger.Error("unable to parse request", zap.Error(rErr))
xhttp.Error(w, rErr.Inner(), rErr.Code())
return
}
Expand Down Expand Up @@ -215,7 +215,7 @@ func (h *createHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {

placementProto, err := currPlacement.Proto()
if err != nil {
logger.Error("unable to get placement protobuf", zap.Any("error", err))
logger.Error("unable to get placement protobuf", zap.Error(err))
xhttp.Error(w, err, http.StatusInternalServerError)
return
}
Expand Down
10 changes: 8 additions & 2 deletions src/query/api/v1/handler/json/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,18 @@ func (h *WriteJSONHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {

writeQuery, err := newStorageWriteQuery(req)
if err != nil {
logging.WithContext(r.Context()).Error("Parsing error", zap.Any("err", err))
logger := logging.WithContext(r.Context())
logger.Error("parsing error",
zap.String("remoteAddr", r.RemoteAddr),
zap.Error(err))
xhttp.Error(w, err, http.StatusInternalServerError)
}

if err := h.store.Write(r.Context(), writeQuery); err != nil {
logging.WithContext(r.Context()).Error("Write error", zap.Any("err", err))
logger := logging.WithContext(r.Context())
logger.Error("write error",
zap.String("remoteAddr", r.RemoteAddr),
zap.Error(err))
xhttp.Error(w, err, http.StatusInternalServerError)
}
}
Expand Down
38 changes: 38 additions & 0 deletions src/query/api/v1/handler/json/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,12 @@
package json

import (
"bytes"
"context"
"fmt"
"io/ioutil"
"net/http"
"net/http/httptest"
"strings"
"testing"

Expand Down Expand Up @@ -96,3 +100,37 @@ func TestJSONWrite(t *testing.T) {
writeErr := jsonWrite.store.Write(context.TODO(), writeQuery)
require.NoError(t, writeErr)
}

func TestJSONWriteError(t *testing.T) {
logging.InitWithCores(nil)

expectedErr := fmt.Errorf("an error")

ctrl := gomock.NewController(t)
storage, session := m3.NewStorageAndSession(t, ctrl)
session.EXPECT().
WriteTagged(gomock.Any(), gomock.Any(), gomock.Any(),
gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
AnyTimes().
Return(expectedErr)
session.EXPECT().IteratorPools().
Return(nil, nil).AnyTimes()

jsonWrite := &WriteJSONHandler{store: storage}

jsonReq := generateJSONWriteRequest()
req, err := http.NewRequest(JSONWriteHTTPMethod, WriteJSONURL,
strings.NewReader(jsonReq))
require.NoError(t, err)

writer := httptest.NewRecorder()
jsonWrite.ServeHTTP(writer, req)
resp := writer.Result()
require.Equal(t, http.StatusInternalServerError, resp.StatusCode)

body, err := ioutil.ReadAll(resp.Body)
require.NoError(t, err)

require.True(t, bytes.Contains(body, []byte(expectedErr.Error())),
fmt.Sprintf("body: %s", body))
}
4 changes: 2 additions & 2 deletions src/query/api/v1/handler/namespace/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (h *AddHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {

md, rErr := h.parseRequest(r)
if rErr != nil {
logger.Error("unable to parse request", zap.Any("error", rErr))
logger.Error("unable to parse request", zap.Error(rErr))
xhttp.Error(w, rErr.Inner(), rErr.Code())
return
}
Expand All @@ -82,7 +82,7 @@ func (h *AddHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}

logger.Error("unable to get namespace", zap.Any("error", err))
logger.Error("unable to get namespace", zap.Error(err))
xhttp.Error(w, err, http.StatusBadRequest)
return
}
Expand Down
4 changes: 2 additions & 2 deletions src/query/api/v1/handler/namespace/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,14 @@ func (h *DeleteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
logger := logging.WithContext(ctx)
id := strings.TrimSpace(mux.Vars(r)[namespaceIDVar])
if id == "" {
logger.Error("no namespace ID to delete", zap.Any("error", errEmptyID))
logger.Error("no namespace ID to delete", zap.Error(errEmptyID))
xhttp.Error(w, errEmptyID, http.StatusBadRequest)
return
}

err := h.Delete(id)
if err != nil {
logger.Error("unable to delete namespace", zap.Any("error", err))
logger.Error("unable to delete namespace", zap.Error(err))
if err == errNamespaceNotFound {
xhttp.Error(w, err, http.StatusNotFound)
} else {
Expand Down
2 changes: 1 addition & 1 deletion src/query/api/v1/handler/namespace/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (h *GetHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
nsRegistry, err := h.Get()

if err != nil {
logger.Error("unable to get namespace", zap.Any("error", err))
logger.Error("unable to get namespace", zap.Error(err))
xhttp.Error(w, err, http.StatusInternalServerError)
return
}
Expand Down
2 changes: 1 addition & 1 deletion src/query/api/v1/handler/openapi/openapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (h *DocHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
doc, err := assets.FSByte(false, docPath)

if err != nil {
logger.Error("unable to load doc", zap.Any("error", err))
logger.Error("unable to load doc", zap.Error(err))
xhttp.Error(w, err, http.StatusInternalServerError)
return
}
Expand Down
4 changes: 2 additions & 2 deletions src/query/api/v1/handler/placement/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,14 @@ func (h *AddHandler) ServeHTTP(serviceName string, w http.ResponseWriter, r *htt
if _, ok := err.(unsafeAddError); ok {
status = http.StatusBadRequest
}
logger.Error("unable to add placement", zap.Any("error", err))
logger.Error("unable to add placement", zap.Error(err))
xhttp.Error(w, err, status)
return
}

placementProto, err := placement.Proto()
if err != nil {
logger.Error("unable to get placement protobuf", zap.Any("error", err))
logger.Error("unable to get placement protobuf", zap.Error(err))
xhttp.Error(w, err, http.StatusInternalServerError)
return
}
Expand Down
6 changes: 3 additions & 3 deletions src/query/api/v1/handler/placement/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (h *DeleteHandler) ServeHTTP(serviceName string, w http.ResponseWriter, r *
)

if id == "" {
logger.Error("no placement ID provided to delete", zap.Any("error", errEmptyID))
logger.Error("no placement ID provided to delete", zap.Error(errEmptyID))
xhttp.Error(w, errEmptyID, http.StatusBadRequest)
return
}
Expand Down Expand Up @@ -108,7 +108,7 @@ func (h *DeleteHandler) ServeHTTP(serviceName string, w http.ResponseWriter, r *
if force {
newPlacement, err = service.RemoveInstances(toRemove)
if err != nil {
logger.Error("unable to delete instances", zap.Any("error", err))
logger.Error("unable to delete instances", zap.Error(err))
xhttp.Error(w, err, http.StatusNotFound)
return
}
Expand Down Expand Up @@ -151,7 +151,7 @@ func (h *DeleteHandler) ServeHTTP(serviceName string, w http.ResponseWriter, r *

placementProto, err := newPlacement.Proto()
if err != nil {
logger.Error("unable to get placement protobuf", zap.Any("error", err))
logger.Error("unable to get placement protobuf", zap.Error(err))
xhttp.Error(w, err, http.StatusInternalServerError)
return
}
Expand Down
2 changes: 1 addition & 1 deletion src/query/api/v1/handler/placement/delete_all.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (h *DeleteAllHandler) ServeHTTP(serviceName string, w http.ResponseWriter,
}

if err := service.Delete(); err != nil {
logger.Error("unable to delete placement", zap.Any("error", err))
logger.Error("unable to delete placement", zap.Error(err))
xhttp.Error(w, err, http.StatusInternalServerError)
return
}
Expand Down
2 changes: 1 addition & 1 deletion src/query/api/v1/handler/placement/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (h *GetHandler) ServeHTTP(serviceName string, w http.ResponseWriter, r *htt

placementProto, err := placement.Proto()
if err != nil {
logger.Error("unable to get placement protobuf", zap.Any("error", err))
logger.Error("unable to get placement protobuf", zap.Error(err))
xhttp.Error(w, err, http.StatusInternalServerError)
return
}
Expand Down
4 changes: 2 additions & 2 deletions src/query/api/v1/handler/placement/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,14 @@ func (h *InitHandler) ServeHTTP(serviceName string, w http.ResponseWriter, r *ht
xhttp.Error(w, err, http.StatusConflict)
return
}
logger.Error("unable to initialize placement", zap.Any("error", err))
logger.Error("unable to initialize placement", zap.Error(err))
xhttp.Error(w, err, http.StatusInternalServerError)
return
}

placementProto, err := placement.Proto()
if err != nil {
logger.Error("unable to get placement protobuf", zap.Any("error", err))
logger.Error("unable to get placement protobuf", zap.Error(err))
xhttp.Error(w, err, http.StatusInternalServerError)
return
}
Expand Down
2 changes: 1 addition & 1 deletion src/query/api/v1/handler/prometheus/native/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func parseParams(r *http.Request, timeoutOpts *prometheus.TimeoutOpts) (models.R
if endExclusiveVal != "" {
excludeEnd, err := strconv.ParseBool(endExclusiveVal)
if err != nil {
logging.WithContext(r.Context()).Warn("unable to parse end inclusive flag", zap.Any("error", err))
logging.WithContext(r.Context()).Warn("unable to parse end inclusive flag", zap.Error(err))
}

params.IncludeEnd = !excludeEnd
Expand Down
7 changes: 4 additions & 3 deletions src/query/api/v1/handler/prometheus/remote/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (h *PromReadHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
result, err := h.read(ctx, w, req, timeout)
if err != nil {
h.promReadMetrics.fetchErrorsServer.Inc(1)
logger.Error("unable to fetch data", zap.Any("error", err))
logger.Error("unable to fetch data", zap.Error(err))
xhttp.Error(w, err, http.StatusInternalServerError)
return
}
Expand All @@ -112,7 +112,7 @@ func (h *PromReadHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
data, err := proto.Marshal(resp)
if err != nil {
h.promReadMetrics.fetchErrorsServer.Inc(1)
logger.Error("unable to marshal read results to protobuf", zap.Any("error", err))
logger.Error("unable to marshal read results to protobuf", zap.Error(err))
xhttp.Error(w, err, http.StatusInternalServerError)
return
}
Expand All @@ -123,7 +123,8 @@ func (h *PromReadHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
compressed := snappy.Encode(nil, data)
if _, err := w.Write(compressed); err != nil {
h.promReadMetrics.fetchErrorsServer.Inc(1)
logger.Error("unable to encode read results to snappy", zap.Any("err", err))
logger.Error("unable to encode read results to snappy",
zap.Error(err))
xhttp.Error(w, err, http.StatusInternalServerError)
return
}
Expand Down
5 changes: 4 additions & 1 deletion src/query/api/v1/handler/prometheus/remote/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,10 @@ func (h *PromWriteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
err := h.write(r.Context(), req)
if err != nil {
h.promWriteMetrics.writeErrorsServer.Inc(1)
logging.WithContext(r.Context()).Error("Write error", zap.Any("err", err))
logger := logging.WithContext(r.Context())
logger.Error("write error",
zap.String("remoteAddr", r.RemoteAddr),
zap.Error(err))
xhttp.Error(w, err, http.StatusInternalServerError)
return
}
Expand Down
34 changes: 34 additions & 0 deletions src/query/api/v1/handler/prometheus/remote/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@
package remote

import (
"bytes"
"context"
"fmt"
"io/ioutil"
"net/http"
"net/http/httptest"
"testing"
Expand All @@ -30,6 +33,7 @@ import (
"github.com/m3db/m3/src/cmd/services/m3coordinator/ingest"
"github.com/m3db/m3/src/dbnode/x/metrics"
"github.com/m3db/m3/src/query/api/v1/handler/prometheus/remote/test"
"github.com/m3db/m3/src/query/models"
"github.com/m3db/m3/src/query/util/logging"
xclock "github.com/m3db/m3/src/x/clock"

Expand Down Expand Up @@ -72,6 +76,36 @@ func TestPromWrite(t *testing.T) {
require.NoError(t, writeErr)
}

func TestPromWriteError(t *testing.T) {
logging.InitWithCores(nil)

anError := fmt.Errorf("an error")

ctrl := gomock.NewController(t)
mockDownsamplerAndWriter := ingest.NewMockDownsamplerAndWriter(ctrl)
mockDownsamplerAndWriter.EXPECT().
WriteBatch(gomock.Any(), gomock.Any()).
Return(anError)

promWrite, err := NewPromWriteHandler(mockDownsamplerAndWriter,
models.NewTagOptions(), tally.NoopScope)
require.NoError(t, err)

promReq := test.GeneratePromWriteRequest()
promReqBody := test.GeneratePromWriteRequestBody(t, promReq)
req, err := http.NewRequest("POST", PromWriteURL, promReqBody)
require.NoError(t, err)

writer := httptest.NewRecorder()
promWrite.ServeHTTP(writer, req)
resp := writer.Result()
require.Equal(t, http.StatusInternalServerError, resp.StatusCode)

body, err := ioutil.ReadAll(resp.Body)
require.NoError(t, err)
require.True(t, bytes.Contains(body, []byte(anError.Error())))
}

func TestWriteErrorMetricCount(t *testing.T) {
logging.InitWithCores(nil)

Expand Down
4 changes: 2 additions & 2 deletions src/query/api/v1/handler/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,15 @@ func (h *SearchHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {

query, rErr := h.parseBody(r)
if rErr != nil {
logger.Error("unable to parse request", zap.Any("error", rErr))
logger.Error("unable to parse request", zap.Error(rErr))
xhttp.Error(w, rErr.Inner(), rErr.Code())
return
}
opts := h.parseURLParams(r)

results, err := h.search(r.Context(), query, opts)
if err != nil {
logger.Error("unable to fetch data", zap.Any("error", err))
logger.Error("unable to fetch data", zap.Error(err))
xhttp.Error(w, err, http.StatusBadRequest)
return
}
Expand Down
Loading

0 comments on commit 5451f5f

Please sign in to comment.