Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Get fetch timeout from db config #1342

Merged
merged 5 commits into from
Feb 1, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions src/query/api/v1/handler/prometheus/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
"github.com/m3db/m3/src/query/storage"
"github.com/m3db/m3/src/query/util"
"github.com/m3db/m3/src/query/util/json"
"github.com/m3db/m3/src/x/net/http"
xhttp "github.com/m3db/m3/src/x/net/http"

"github.com/golang/snappy"
"github.com/gorilla/mux"
Expand All @@ -47,15 +47,18 @@ const (
filterNameTagsParam = "tag"
errFormatStr = "error parsing param: %s, error: %v"

// TODO: get timeouts from configs
maxTimeout = time.Minute
defaultTimeout = time.Second * 15
maxTimeout = 5 * time.Minute
)

var (
matchValues = []byte(".*")
)

// TimeoutOpts stores options related to various timeout configurations
type TimeoutOpts struct {
FetchTimeout time.Duration
}

// ParsePromCompressedRequest parses a snappy compressed request from Prometheus
func ParsePromCompressedRequest(r *http.Request) ([]byte, *xhttp.ParseError) {
body := r.Body
Expand Down Expand Up @@ -83,10 +86,10 @@ func ParsePromCompressedRequest(r *http.Request) ([]byte, *xhttp.ParseError) {
}

// ParseRequestTimeout parses the input request timeout with a default
func ParseRequestTimeout(r *http.Request) (time.Duration, error) {
func ParseRequestTimeout(r *http.Request, configFetchTimeout time.Duration) (time.Duration, error) {
timeout := r.Header.Get("timeout")
if timeout == "" {
return defaultTimeout, nil
return configFetchTimeout, nil
}

duration, err := time.ParseDuration(timeout)
Expand Down
8 changes: 4 additions & 4 deletions src/query/api/v1/handler/prometheus/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,17 +64,17 @@ func TestTimeoutParse(t *testing.T) {
req, _ := http.NewRequest("POST", "dummy", nil)
req.Header.Add("timeout", "1ms")

timeout, err := ParseRequestTimeout(req)
timeout, err := ParseRequestTimeout(req, time.Second)
assert.NoError(t, err)
assert.Equal(t, timeout, time.Millisecond)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this fail?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops never mind github hid the top bit


req.Header.Del("timeout")
timeout, err = ParseRequestTimeout(req)
timeout, err = ParseRequestTimeout(req, 2*time.Minute)
assert.NoError(t, err)
assert.Equal(t, timeout, defaultTimeout)
assert.Equal(t, timeout, 2*time.Minute)

req.Header.Add("timeout", "invalid")
_, err = ParseRequestTimeout(req)
_, err = ParseRequestTimeout(req, 15*time.Second)
assert.Error(t, err)
}

Expand Down
10 changes: 5 additions & 5 deletions src/query/api/v1/handler/prometheus/native/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (
"github.com/m3db/m3/src/query/util"
"github.com/m3db/m3/src/query/util/json"
"github.com/m3db/m3/src/query/util/logging"
"github.com/m3db/m3/src/x/net/http"
xhttp "github.com/m3db/m3/src/x/net/http"

"go.uber.org/zap"
)
Expand Down Expand Up @@ -83,12 +83,12 @@ func parseDuration(r *http.Request, key string) (time.Duration, error) {
}

// parseParams parses all params from the GET request
func parseParams(r *http.Request) (models.RequestParams, *xhttp.ParseError) {
func parseParams(r *http.Request, timeoutOpts *prometheus.TimeoutOpts) (models.RequestParams, *xhttp.ParseError) {
params := models.RequestParams{
Now: time.Now(),
}

t, err := prometheus.ParseRequestTimeout(r)
t, err := prometheus.ParseRequestTimeout(r, timeoutOpts.FetchTimeout)
if err != nil {
return params, xhttp.NewParseError(err, http.StatusBadRequest)
}
Expand Down Expand Up @@ -179,14 +179,14 @@ func parseBlockType(r *http.Request) models.FetchedBlockType {
}

// parseInstantaneousParams parses all params from the GET request
func parseInstantaneousParams(r *http.Request) (models.RequestParams, *xhttp.ParseError) {
func parseInstantaneousParams(r *http.Request, timeoutOpts *prometheus.TimeoutOpts) (models.RequestParams, *xhttp.ParseError) {
params := models.RequestParams{
Now: time.Now(),
Step: time.Second,
IncludeEnd: true,
}

t, err := prometheus.ParseRequestTimeout(r)
t, err := prometheus.ParseRequestTimeout(r, timeoutOpts.FetchTimeout)
if err != nil {
return params, xhttp.NewParseError(err, http.StatusBadRequest)
}
Expand Down
15 changes: 11 additions & 4 deletions src/query/api/v1/handler/prometheus/native/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"testing"
"time"

"github.com/m3db/m3/src/query/api/v1/handler/prometheus"
"github.com/m3db/m3/src/query/models"
"github.com/m3db/m3/src/query/test"
"github.com/m3db/m3/src/query/ts"
Expand All @@ -42,6 +43,12 @@ const (
promQuery = `http_requests_total{job="prometheus",group="canary"}`
)

var (
timeoutOpts = &prometheus.TimeoutOpts{
FetchTimeout: 15 * time.Second,
}
)

func defaultParams() url.Values {
vals := url.Values{}
now := time.Now()
Expand All @@ -56,7 +63,7 @@ func TestParamParsing(t *testing.T) {
req, _ := http.NewRequest("GET", PromReadURL, nil)
req.URL.RawQuery = defaultParams().Encode()

r, err := parseParams(req)
r, err := parseParams(req, timeoutOpts)
require.Nil(t, err, "unable to parse request")
require.Equal(t, promQuery, r.Query)
}
Expand All @@ -69,7 +76,7 @@ func TestInstantaneousParamParsing(t *testing.T) {
params.Add(timeParam, now.Format(time.RFC3339))
req.URL.RawQuery = params.Encode()

r, err := parseInstantaneousParams(req)
r, err := parseInstantaneousParams(req, timeoutOpts)
require.Nil(t, err, "unable to parse request")
require.Equal(t, promQuery, r.Query)
}
Expand All @@ -79,7 +86,7 @@ func TestInvalidStart(t *testing.T) {
vals := defaultParams()
vals.Del(startParam)
req.URL.RawQuery = vals.Encode()
_, err := parseParams(req)
_, err := parseParams(req, timeoutOpts)
require.NotNil(t, err, "unable to parse request")
require.Equal(t, err.Code(), http.StatusBadRequest)
}
Expand All @@ -90,7 +97,7 @@ func TestInvalidTarget(t *testing.T) {
vals.Del(queryParam)
req.URL.RawQuery = vals.Encode()

p, err := parseParams(req)
p, err := parseParams(req, timeoutOpts)
require.NotNil(t, err, "unable to parse request")
assert.NotNil(t, p.Start)
require.Equal(t, err.Code(), http.StatusBadRequest)
Expand Down
6 changes: 5 additions & 1 deletion src/query/api/v1/handler/prometheus/native/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/m3db/m3/src/cmd/services/m3query/config"
"github.com/m3db/m3/src/query/api/v1/handler"
"github.com/m3db/m3/src/query/api/v1/handler/prometheus"
"github.com/m3db/m3/src/query/block"
"github.com/m3db/m3/src/query/executor"
"github.com/m3db/m3/src/query/models"
Expand Down Expand Up @@ -61,6 +62,7 @@ type PromReadHandler struct {
tagOpts models.TagOptions
limitsCfg *config.LimitsConfiguration
promReadMetrics promReadMetrics
timeoutOps *prometheus.TimeoutOpts
}

type promReadMetrics struct {
Expand Down Expand Up @@ -103,12 +105,14 @@ func NewPromReadHandler(
tagOpts models.TagOptions,
limitsCfg *config.LimitsConfiguration,
scope tally.Scope,
timeoutOpts *prometheus.TimeoutOpts,
) *PromReadHandler {
h := &PromReadHandler{
engine: engine,
tagOpts: tagOpts,
limitsCfg: limitsCfg,
promReadMetrics: newPromReadMetrics(scope),
timeoutOps: timeoutOpts,
}

h.promReadMetrics.maxDatapoints.Update(float64(limitsCfg.MaxComputedDatapoints))
Expand Down Expand Up @@ -143,7 +147,7 @@ func (h *PromReadHandler) ServeHTTPWithEngine(w http.ResponseWriter, r *http.Req
ctx := context.WithValue(r.Context(), handler.HeaderKey, r.Header)
logger := logging.WithContext(ctx)

params, rErr := parseParams(r)
params, rErr := parseParams(r, h.timeoutOps)
if rErr != nil {
h.promReadMetrics.fetchErrorsClient.Inc(1)
return nil, emptyReqParams, &RespError{Err: rErr.Inner(), Code: rErr.Code()}
Expand Down
16 changes: 10 additions & 6 deletions src/query/api/v1/handler/prometheus/native/read_instantaneous.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@ import (
"net/http"

"github.com/m3db/m3/src/query/api/v1/handler"
"github.com/m3db/m3/src/query/api/v1/handler/prometheus"
"github.com/m3db/m3/src/query/executor"
"github.com/m3db/m3/src/query/models"
"github.com/m3db/m3/src/query/util/logging"
"github.com/m3db/m3/src/x/net/http"
xhttp "github.com/m3db/m3/src/x/net/http"

"go.uber.org/zap"
)
Expand All @@ -45,25 +46,28 @@ const (

// PromReadInstantHandler represents a handler for prometheus instantaneous read endpoint.
type PromReadInstantHandler struct {
engine *executor.Engine
tagOpts models.TagOptions
engine *executor.Engine
tagOpts models.TagOptions
timeoutOpts *prometheus.TimeoutOpts
}

// NewPromReadInstantHandler returns a new instance of handler.
func NewPromReadInstantHandler(
engine *executor.Engine,
tagOpts models.TagOptions,
timeoutOpts *prometheus.TimeoutOpts,
) *PromReadInstantHandler {
return &PromReadInstantHandler{
engine: engine,
tagOpts: tagOpts,
engine: engine,
tagOpts: tagOpts,
timeoutOpts: timeoutOpts,
}
}

func (h *PromReadInstantHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ctx := context.WithValue(r.Context(), handler.HeaderKey, r.Header)
logger := logging.WithContext(ctx)
params, rErr := parseInstantaneousParams(r)
params, rErr := parseInstantaneousParams(r, h.timeoutOpts)
if rErr != nil {
xhttp.Error(w, rErr.Inner(), rErr.Code())
return
Expand Down
3 changes: 2 additions & 1 deletion src/query/api/v1/handler/prometheus/native/read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func TestPromReadHandler_Read(t *testing.T) {
req, _ := http.NewRequest("GET", PromReadURL, nil)
req.URL.RawQuery = defaultParams().Encode()

r, parseErr := parseParams(req)
r, parseErr := parseParams(req, timeoutOpts)
require.Nil(t, parseErr)
assert.Equal(t, models.FormatPromQL, r.FormatType)
seriesList, err := read(context.TODO(), promRead.engine, promRead.tagOpts, httptest.NewRecorder(), r)
Expand Down Expand Up @@ -130,6 +130,7 @@ func newTestSetup() *testSetup {
models.NewTagOptions(),
&config.LimitsConfiguration{},
tally.NewTestScope("", nil),
timeoutOpts,
),
}
}
Expand Down
8 changes: 5 additions & 3 deletions src/query/api/v1/handler/prometheus/remote/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
"github.com/m3db/m3/src/query/generated/proto/prompb"
"github.com/m3db/m3/src/query/storage"
"github.com/m3db/m3/src/query/util/logging"
"github.com/m3db/m3/src/x/net/http"
xhttp "github.com/m3db/m3/src/x/net/http"

"github.com/golang/protobuf/proto"
"github.com/golang/snappy"
Expand All @@ -52,13 +52,15 @@ const (
type PromReadHandler struct {
engine *executor.Engine
promReadMetrics promReadMetrics
timeoutOpts *prometheus.TimeoutOpts
}

// NewPromReadHandler returns a new instance of handler.
func NewPromReadHandler(engine *executor.Engine, scope tally.Scope) http.Handler {
func NewPromReadHandler(engine *executor.Engine, scope tally.Scope, timeoutOpts *prometheus.TimeoutOpts) http.Handler {
return &PromReadHandler{
engine: engine,
promReadMetrics: newPromReadMetrics(scope),
timeoutOpts: timeoutOpts,
}
}

Expand Down Expand Up @@ -87,7 +89,7 @@ func (h *PromReadHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}

timeout, err := prometheus.ParseRequestTimeout(r)
timeout, err := prometheus.ParseRequestTimeout(r, h.timeoutOpts.FetchTimeout)
if err != nil {
h.promReadMetrics.fetchErrorsClient.Inc(1)
xhttp.Error(w, err, http.StatusBadRequest)
Expand Down
38 changes: 32 additions & 6 deletions src/query/api/v1/handler/prometheus/remote/read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"time"

xmetrics "github.com/m3db/m3/src/dbnode/x/metrics"
"github.com/m3db/m3/src/query/api/v1/handler/prometheus"
"github.com/m3db/m3/src/query/executor"
"github.com/m3db/m3/src/query/models"
"github.com/m3db/m3/src/query/storage"
Expand All @@ -47,6 +48,10 @@ import (
var (
promReadTestMetrics = newPromReadMetrics(tally.NewTestScope("", nil))
defaultLookbackDuration = time.Minute

timeoutOpts = &prometheus.TimeoutOpts{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a test to check that the timeout is being honoured?

FetchTimeout: 15 * time.Second,
}
)

func setupServer(t *testing.T) *httptest.Server {
Expand All @@ -58,13 +63,16 @@ func setupServer(t *testing.T) *httptest.Server {
FetchTagged(gomock.Any(), gomock.Any(), gomock.Any()).
Return(nil, false, fmt.Errorf("not initialized"))
storage := test.NewSlowStorage(lstore, 10*time.Millisecond)
promRead := readHandler(storage)
promRead := readHandler(storage, timeoutOpts)
server := httptest.NewServer(test.NewSlowHandler(promRead, 10*time.Millisecond))
return server
}

func readHandler(store storage.Storage) *PromReadHandler {
return &PromReadHandler{engine: executor.NewEngine(store, tally.NewTestScope("test", nil), defaultLookbackDuration), promReadMetrics: promReadTestMetrics}
func readHandler(store storage.Storage, timeoutOpts *prometheus.TimeoutOpts) *PromReadHandler {
return &PromReadHandler{engine: executor.NewEngine(store, tally.NewTestScope("test", nil), defaultLookbackDuration),
promReadMetrics: promReadTestMetrics,
timeoutOpts: timeoutOpts,
}
}

func TestPromReadParsing(t *testing.T) {
Expand All @@ -79,11 +87,29 @@ func TestPromReadParsing(t *testing.T) {
require.Equal(t, len(r.Queries), 1)
}

func TestPromFetchTimeoutParsing(t *testing.T) {
logging.InitWithCores(nil)
ctrl := gomock.NewController(t)
storage, _ := m3.NewStorageAndSession(t, ctrl)
promRead := &PromReadHandler{
engine: executor.NewEngine(storage, tally.NewTestScope("test", nil), defaultLookbackDuration),
promReadMetrics: promReadTestMetrics,
timeoutOpts: &prometheus.TimeoutOpts{
FetchTimeout: 2 * time.Minute,
},
}

req, _ := http.NewRequest("POST", PromReadURL, test.GeneratePromReadBody(t))
dur, err := prometheus.ParseRequestTimeout(req, promRead.timeoutOpts.FetchTimeout)
require.NoError(t, err)
assert.Equal(t, 2*time.Minute, dur)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

Can you add a test with a default, and an invalid timeout value?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have these tests in the native package. Not sure we need to duplicate.

}

func TestPromReadParsingBad(t *testing.T) {
logging.InitWithCores(nil)
ctrl := gomock.NewController(t)
storage, _ := m3.NewStorageAndSession(t, ctrl)
promRead := readHandler(storage)
promRead := readHandler(storage, timeoutOpts)
req, _ := http.NewRequest("POST", PromReadURL, strings.NewReader("bad body"))
_, err := promRead.parseRequest(req)
require.NotNil(t, err, "unable to parse request")
Expand All @@ -97,7 +123,7 @@ func TestPromReadStorageWithFetchError(t *testing.T) {
Return(nil, true, fmt.Errorf("unable to get data"))
session.EXPECT().IteratorPools().
Return(nil, nil)
promRead := readHandler(storage)
promRead := readHandler(storage, timeoutOpts)
req := test.GeneratePromReadRequest()
_, err := promRead.read(context.TODO(), httptest.NewRecorder(), req, time.Hour)
require.NotNil(t, err, "unable to read from storage")
Expand Down Expand Up @@ -154,7 +180,7 @@ func TestReadErrorMetricsCount(t *testing.T) {
defer closer.Close()
readMetrics := newPromReadMetrics(scope)

promRead := &PromReadHandler{engine: executor.NewEngine(storage, scope, defaultLookbackDuration), promReadMetrics: readMetrics}
promRead := &PromReadHandler{engine: executor.NewEngine(storage, scope, defaultLookbackDuration), promReadMetrics: readMetrics, timeoutOpts: timeoutOpts}
req, _ := http.NewRequest("POST", PromReadURL, test.GeneratePromReadBody(t))
promRead.ServeHTTP(httptest.NewRecorder(), req)

Expand Down
Loading