Skip to content

Commit

Permalink
[query] Fix query metrics assigning invalid status codes for errors
Browse files Browse the repository at this point in the history
  • Loading branch information
vpranckaitis committed Nov 30, 2020
1 parent e03804b commit cf75a26
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 27 deletions.
34 changes: 20 additions & 14 deletions src/query/api/experimental/annotated/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,18 @@ func NewHandler(

func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Body == nil {
h.metrics.writeErrorsClient.Inc(1)
xhttp.WriteError(w, errEmptyBody)
err := errEmptyBody
h.metrics.incError(err)
xhttp.WriteError(w, err)
return
}
defer r.Body.Close()

req, err := parseRequest(r.Body)
if err != nil {
h.metrics.writeErrorsClient.Inc(1)
xhttp.WriteError(w, xhttp.NewError(err, http.StatusBadRequest))
resultError := xhttp.NewError(err, http.StatusBadRequest)
h.metrics.incError(resultError)
xhttp.WriteError(w, resultError)
return
}

Expand All @@ -97,20 +99,16 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
break
}

var (
status = http.StatusBadRequest
counter = h.metrics.writeErrorsClient
)
status := http.StatusBadRequest
if foundInternalErr {
status = http.StatusInternalServerError
counter = h.metrics.writeErrorsServer
}
counter.Inc(1)

err = fmt.Errorf(
"unable to write metric batch, encountered %d errors: %v", len(batchErr.Errors()), batchErr.Error(),
)
xhttp.WriteError(w, xhttp.NewError(err, status))
err = fmt.Errorf("unable to write metric batch, encountered %d errors: %w",
len(batchErr.Errors()), batchErr)
responseError := xhttp.NewError(err, status)
h.metrics.incError(responseError)
xhttp.WriteError(w, responseError)
return
}

Expand Down Expand Up @@ -150,3 +148,11 @@ func newHandlerMetrics(s tally.Scope) handlerMetrics {
writeErrorsClient: s.SubScope("write").Tagged(map[string]string{"code": "4XX"}).Counter("errors"),
}
}

func (m *handlerMetrics) incError(err error) {
if xhttp.IsClientError(err) {
m.writeErrorsClient.Inc(1)
} else {
m.writeErrorsServer.Inc(1)
}
}
4 changes: 2 additions & 2 deletions src/query/api/v1/handler/prometheus/native/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (h *promReadHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {

parsedOptions, rErr := ParseRequest(ctx, r, h.instant, h.opts)
if rErr != nil {
h.promReadMetrics.fetchErrorsClient.Inc(1)
h.promReadMetrics.incError(rErr)
logger.Error("could not parse request", zap.Error(rErr))
xhttp.WriteError(w, rErr)
return
Expand All @@ -143,7 +143,7 @@ func (h *promReadHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
logger.Error("range query error",
zap.Error(err),
zap.Any("parsedOptions", parsedOptions))
h.promReadMetrics.fetchErrorsServer.Inc(1)
h.promReadMetrics.incError(err)

xhttp.WriteError(w, err)
return
Expand Down
9 changes: 9 additions & 0 deletions src/query/api/v1/handler/prometheus/native/read_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/m3db/m3/src/query/storage"
"github.com/m3db/m3/src/query/ts"
xerrors "github.com/m3db/m3/src/x/errors"
xhttp "github.com/m3db/m3/src/x/net/http"
xopentracing "github.com/m3db/m3/src/x/opentracing"

opentracinglog "github.com/opentracing/opentracing-go/log"
Expand All @@ -59,6 +60,14 @@ func newPromReadMetrics(scope tally.Scope) promReadMetrics {
}
}

func (m *promReadMetrics) incError(err error) {
if xhttp.IsClientError(err) {
m.fetchErrorsClient.Inc(1)
} else {
m.fetchErrorsServer.Inc(1)
}
}

// ReadResponse is the response that gets returned to the user
type ReadResponse struct {
Results []ts.Series `json:"results,omitempty"`
Expand Down
14 changes: 11 additions & 3 deletions src/query/api/v1/handler/prometheus/remote/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,14 @@ func newPromReadMetrics(scope tally.Scope) promReadMetrics {
}
}

func (m *promReadMetrics) incError(err error) {
if xhttp.IsClientError(err) {
m.fetchErrorsClient.Inc(1)
} else {
m.fetchErrorsServer.Inc(1)
}
}

func (h *promReadHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
timer := h.promReadMetrics.fetchTimerSuccess.Start()
defer timer.Stop()
Expand All @@ -109,7 +117,7 @@ func (h *promReadHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
logger := logging.WithContext(ctx, h.opts.InstrumentOpts())
req, fetchOpts, rErr := ParseRequest(ctx, r, h.opts)
if rErr != nil {
h.promReadMetrics.fetchErrorsClient.Inc(1)
h.promReadMetrics.incError(rErr)
logger.Error("remote read query parse error",
zap.Error(rErr),
zap.Any("req", req),
Expand All @@ -121,7 +129,7 @@ func (h *promReadHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
cancelWatcher := handler.NewResponseWriterCanceller(w, h.opts.InstrumentOpts())
readResult, err := Read(ctx, cancelWatcher, req, fetchOpts, h.opts)
if err != nil {
h.promReadMetrics.fetchErrorsServer.Inc(1)
h.promReadMetrics.incError(err)
logger.Error("remote read query error",
zap.Error(err),
zap.Any("req", req),
Expand Down Expand Up @@ -183,7 +191,7 @@ func (h *promReadHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}

if err != nil {
h.promReadMetrics.fetchErrorsServer.Inc(1)
h.promReadMetrics.incError(err)
} else {
h.promReadMetrics.fetchSuccess.Inc(1)
}
Expand Down
25 changes: 17 additions & 8 deletions src/query/api/v1/handler/prometheus/remote/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,14 @@ type promWriteMetrics struct {
forwardLatency tally.Histogram
}

func (m *promWriteMetrics) incError(err error) {
if xhttp.IsClientError(err) {
m.writeErrorsClient.Inc(1)
} else {
m.writeErrorsServer.Inc(1)
}
}

func newPromWriteMetrics(scope tally.Scope) (promWriteMetrics, error) {
upTo1sBuckets, err := tally.LinearDurationBuckets(0, 100*time.Millisecond, 10)
if err != nil {
Expand Down Expand Up @@ -263,7 +271,7 @@ func (h *PromWriteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {

checkedReq, err := h.checkedParseRequest(r)
if err != nil {
h.metrics.writeErrorsClient.Inc(1)
h.metrics.incError(err)
xhttp.WriteError(w, err)
return
}
Expand Down Expand Up @@ -359,10 +367,8 @@ func (h *PromWriteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
switch {
case numBadRequest == len(errs):
status = http.StatusBadRequest
h.metrics.writeErrorsClient.Inc(1)
default:
status = http.StatusInternalServerError
h.metrics.writeErrorsServer.Inc(1)
}

logger := logging.WithContext(r.Context(), h.instrumentOpts)
Expand All @@ -374,20 +380,23 @@ func (h *PromWriteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
zap.String("lastRegularError", lastRegularErr),
zap.String("lastBadRequestErr", lastBadRequestErr))

var resultErr string
var resultErrMessage string
if lastRegularErr != "" {
resultErr = fmt.Sprintf("retryable_errors: count=%d, last=%s",
resultErrMessage = fmt.Sprintf("retryable_errors: count=%d, last=%s",
numRegular, lastRegularErr)
}
if lastBadRequestErr != "" {
var sep string
if lastRegularErr != "" {
sep = ", "
}
resultErr = fmt.Sprintf("%s%sbad_request_errors: count=%d, last=%s",
resultErr, sep, numBadRequest, lastBadRequestErr)
resultErrMessage = fmt.Sprintf("%s%sbad_request_errors: count=%d, last=%s",
resultErrMessage, sep, numBadRequest, lastBadRequestErr)
}
xhttp.WriteError(w, xhttp.NewError(errors.New(resultErr), status))

resultError := xhttp.NewError(errors.New(resultErrMessage), status)
h.metrics.incError(resultError)
xhttp.WriteError(w, resultError)
return
}

Expand Down
6 changes: 6 additions & 0 deletions src/x/net/http/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,3 +116,9 @@ func getStatusCode(err error) int {
}
return http.StatusInternalServerError
}

// IsClientError returns true if this error would result in 4xx status code
func IsClientError(err error) bool {
code := getStatusCode(err)
return code >= 400 && code < 500
}

0 comments on commit cf75a26

Please sign in to comment.