Skip to content

Commit

Permalink
[query] Refactor query code, add warnings to prom output (#2265)
Browse files Browse the repository at this point in the history
  • Loading branch information
arnikola authored Apr 17, 2020
1 parent 9d69cdd commit eea6199
Show file tree
Hide file tree
Showing 30 changed files with 1,105 additions and 928 deletions.
14 changes: 7 additions & 7 deletions src/cmd/services/m3query/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func (c QueryConfiguration) TimeoutOrDefault() time.Duration {
// instance. Limits are split between per-query and global limits.
type LimitsConfiguration struct {
// deprecated: use PerQuery.MaxComputedDatapoints instead.
DeprecatedMaxComputedDatapoints int64 `yaml:"maxComputedDatapoints"`
DeprecatedMaxComputedDatapoints int `yaml:"maxComputedDatapoints"`

// Global configures limits which apply across all queries running on this
// instance.
Expand All @@ -232,7 +232,7 @@ type LimitsConfiguration struct {
// LimitsConfiguration.PerQuery.PrivateMaxComputedDatapoints. See
// LimitsConfiguration.PerQuery.PrivateMaxComputedDatapoints for a comment on
// the semantics.
func (lc *LimitsConfiguration) MaxComputedDatapoints() int64 {
func (lc LimitsConfiguration) MaxComputedDatapoints() int {
if lc.PerQuery.PrivateMaxComputedDatapoints != 0 {
return lc.PerQuery.PrivateMaxComputedDatapoints
}
Expand All @@ -245,7 +245,7 @@ func (lc *LimitsConfiguration) MaxComputedDatapoints() int64 {
type GlobalLimitsConfiguration struct {
// MaxFetchedDatapoints limits the total number of datapoints actually
// fetched by all queries at any given time.
MaxFetchedDatapoints int64 `yaml:"maxFetchedDatapoints"`
MaxFetchedDatapoints int `yaml:"maxFetchedDatapoints"`
}

// AsLimitManagerOptions converts this configuration to
Expand All @@ -264,14 +264,14 @@ type PerQueryLimitsConfiguration struct {
// N.B.: the hacky "Private" prefix is to indicate that callers should use
// LimitsConfiguration.MaxComputedDatapoints() instead of accessing
// this field directly.
PrivateMaxComputedDatapoints int64 `yaml:"maxComputedDatapoints"`
PrivateMaxComputedDatapoints int `yaml:"maxComputedDatapoints"`

// MaxFetchedDatapoints limits the number of datapoints actually used by a
// given query.
MaxFetchedDatapoints int64 `yaml:"maxFetchedDatapoints"`
MaxFetchedDatapoints int `yaml:"maxFetchedDatapoints"`

// MaxFetchedSeries limits the number of time series returned by a storage node.
MaxFetchedSeries int64 `yaml:"maxFetchedSeries"`
MaxFetchedSeries int `yaml:"maxFetchedSeries"`
}

// AsLimitManagerOptions converts this configuration to
Expand All @@ -294,7 +294,7 @@ func (l *PerQueryLimitsConfiguration) AsFetchOptionsBuilderOptions() handleropti
}
}

func toLimitManagerOptions(limit int64) cost.LimitManagerOptions {
func toLimitManagerOptions(limit int) cost.LimitManagerOptions {
return cost.NewLimitManagerOptions().SetDefaultLimit(cost.Limit{
Threshold: cost.Cost(limit),
Enabled: limit > 0,
Expand Down
74 changes: 37 additions & 37 deletions src/cmd/services/m3query/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,36 +72,36 @@ func TestTagOptionsFromConfig(t *testing.T) {
assert.Equal(t, []byte(name), opts.MetricName())
}

func TestLimitsConfiguration_AsLimitManagerOptions(t *testing.T) {
func TestLimitsConfigurationAsLimitManagerOptions(t *testing.T) {
cases := []struct {
Input interface {
input interface {
AsLimitManagerOptions() cost.LimitManagerOptions
}
ExpectedDefault int64
expectedDefault int
}{{
Input: &PerQueryLimitsConfiguration{
input: &PerQueryLimitsConfiguration{
MaxFetchedDatapoints: 5,
},
ExpectedDefault: 5,
expectedDefault: 5,
}, {
Input: &GlobalLimitsConfiguration{
input: &GlobalLimitsConfiguration{
MaxFetchedDatapoints: 6,
},
ExpectedDefault: 6,
expectedDefault: 6,
}}

for _, tc := range cases {
t.Run(fmt.Sprintf("type_%T", tc.Input), func(t *testing.T) {
res := tc.Input.AsLimitManagerOptions()
t.Run(fmt.Sprintf("type_%T", tc.input), func(t *testing.T) {
res := tc.input.AsLimitManagerOptions()
assert.Equal(t, cost.Limit{
Threshold: cost.Cost(tc.ExpectedDefault),
Threshold: cost.Cost(tc.expectedDefault),
Enabled: true,
}, res.DefaultLimit())
})
}
}

func TestLimitsConfiguration_MaxComputedDatapoints(t *testing.T) {
func TestLimitsConfigurationMaxComputedDatapoints(t *testing.T) {
t.Run("uses PerQuery value if provided", func(t *testing.T) {
lc := &LimitsConfiguration{
DeprecatedMaxComputedDatapoints: 6,
Expand All @@ -110,49 +110,49 @@ func TestLimitsConfiguration_MaxComputedDatapoints(t *testing.T) {
},
}

assert.Equal(t, int64(5), lc.MaxComputedDatapoints())
assert.Equal(t, 5, lc.MaxComputedDatapoints())
})

t.Run("uses deprecated value if PerQuery not provided", func(t *testing.T) {
lc := &LimitsConfiguration{
DeprecatedMaxComputedDatapoints: 6,
}

assert.Equal(t, int64(6), lc.MaxComputedDatapoints())
assert.Equal(t, 6, lc.MaxComputedDatapoints())
})
}

func TestToLimitManagerOptions(t *testing.T) {
cases := []struct {
Name string
Input int64
ExpectedLimit cost.Limit
name string
input int
expectedLimit cost.Limit
}{{
Name: "negative is disabled",
Input: -5,
ExpectedLimit: cost.Limit{
name: "negative is disabled",
input: -5,
expectedLimit: cost.Limit{
Threshold: cost.Cost(-5),
Enabled: false,
},
}, {
Name: "zero is disabled",
Input: 0,
ExpectedLimit: cost.Limit{
name: "zero is disabled",
input: 0,
expectedLimit: cost.Limit{
Threshold: cost.Cost(0),
Enabled: false,
},
}, {
Name: "positive is enabled",
Input: 5,
ExpectedLimit: cost.Limit{
name: "positive is enabled",
input: 5,
expectedLimit: cost.Limit{
Threshold: cost.Cost(5),
Enabled: true,
},
}}

for _, tc := range cases {
t.Run(tc.Name, func(t *testing.T) {
assert.Equal(t, tc.ExpectedLimit, toLimitManagerOptions(tc.Input).DefaultLimit())
t.Run(tc.name, func(t *testing.T) {
assert.Equal(t, tc.expectedLimit, toLimitManagerOptions(tc.input).DefaultLimit())
})
}
}
Expand Down Expand Up @@ -185,25 +185,25 @@ func TestConfigValidation(t *testing.T) {

// limits configuration
limitsCfgCases := []struct {
Name string
Limit int64
name string
limit int
}{{
Name: "empty LimitsConfiguration is valid (implies disabled)",
Limit: 0,
name: "empty LimitsConfiguration is valid (implies disabled)",
limit: 0,
}, {
Name: "LimitsConfiguration with positive limit is valid",
Limit: 5,
name: "LimitsConfiguration with positive limit is valid",
limit: 5,
}, {}, {
Name: "LimitsConfiguration with negative limit is valid (implies disabled)",
Limit: -5,
name: "LimitsConfiguration with negative limit is valid (implies disabled)",
limit: -5,
}}

for _, tc := range limitsCfgCases {
t.Run(tc.Name, func(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
cfg := baseCfg(t)
cfg.Limits = LimitsConfiguration{
PerQuery: PerQueryLimitsConfiguration{
PrivateMaxComputedDatapoints: tc.Limit,
PrivateMaxComputedDatapoints: tc.limit,
}}

assert.NoError(t, validator.Validate(cfg))
Expand Down
106 changes: 89 additions & 17 deletions src/query/api/v1/handler/close.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,29 +28,101 @@ import (
"github.com/m3db/m3/src/x/instrument"
)

// CloseWatcher watches for CloseNotify and context timeout. It is best effort and may sometimes not close the channel relying on gc
// CancelWatcher is an interface that wraps a WatchForCancel method.
// TODO: make this generic middleware, rather than applied per function.
type CancelWatcher interface {
// WatchForCancel watches on the given context, and applies
// the given cancellation function.
WatchForCancel(context.Context, context.CancelFunc)
}

type canceller struct {
notifier http.CloseNotifier
iOpts instrument.Options
}

func (c *canceller) WatchForCancel(
ctx context.Context,
cancel context.CancelFunc,
) {
logger := logging.WithContext(ctx, c.iOpts)
notify := c.notifier.CloseNotify()
go func() {
// Wait for either the request to finish
// or for the client to disconnect
select {
case <-notify:
logger.Warn("connection closed by client")
cancel()
case <-ctx.Done():
// We only care about the time out case and not other cancellations
if ctx.Err() == context.DeadlineExceeded {
logger.Warn("request timed out")
}
}
}()
}

type ctxCanceller struct {
iOpts instrument.Options
}

func (c *ctxCanceller) WatchForCancel(
ctx context.Context, _ context.CancelFunc,
) {
logger := logging.WithContext(ctx, c.iOpts)
go func() {
select {
case <-ctx.Done():
// We only care about the time out case and not other cancellations
if ctx.Err() == context.DeadlineExceeded {
logger.Warn("request timed out")
}
}
}()
}

// NewResponseWriterCanceller creates a canceller on the given context with
// the given response writer.
func NewResponseWriterCanceller(
w http.ResponseWriter,
iOpts instrument.Options,
) CancelWatcher {
notifier, ok := w.(http.CloseNotifier)
if !ok {
return &ctxCanceller{iOpts: iOpts}
}

return &canceller{notifier: notifier, iOpts: iOpts}
}

// CloseWatcher watches for CloseNotify and context timeout.
// It is best effort and may sometimes not close the channel relying on GC.
func CloseWatcher(
ctx context.Context,
cancel context.CancelFunc,
w http.ResponseWriter,
instrumentOpts instrument.Options,
) {
notifier, ok := w.(http.CloseNotifier)
if !ok {
return
}

logger := logging.WithContext(ctx, instrumentOpts)
if notifier, ok := w.(http.CloseNotifier); ok {
notify := notifier.CloseNotify()
go func() {
// Wait for either the request to finish
// or for the client to disconnect
select {
case <-notify:
logger.Warn("connection closed by client")
cancel()
case <-ctx.Done():
// We only care about the time out case and not other cancellations
if ctx.Err() == context.DeadlineExceeded {
logger.Warn("request timed out")
}
notify := notifier.CloseNotify()
go func() {
// Wait for either the request to finish
// or for the client to disconnect
select {
case <-notify:
logger.Warn("connection closed by client")
cancel()
case <-ctx.Done():
// We only care about the time out case and not other cancellations
if ctx.Err() == context.DeadlineExceeded {
logger.Warn("request timed out")
}
}()
}
}
}()
}
14 changes: 12 additions & 2 deletions src/query/api/v1/handler/close_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,17 @@ func TestCloseWatcher(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
w := httptest.NewRecorder()
CloseWatcher(ctx, cancel, w, instrument.NewOptions())
assert.Nil(t, ctx.Err())
assert.NoError(t, ctx.Err())
time.Sleep(100 * time.Millisecond)
assert.NotNil(t, ctx.Err())
assert.Error(t, ctx.Err())
}

func TestResponseWriteCanceller(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
w := httptest.NewRecorder()
canceller := NewResponseWriterCanceller(w, instrument.NewOptions())
canceller.WatchForCancel(ctx, cancel)
assert.NoError(t, ctx.Err())
time.Sleep(100 * time.Millisecond)
assert.Error(t, ctx.Err())
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/m3db/m3/src/cluster/kv"
"github.com/m3db/m3/src/dbnode/kvconfig"
"github.com/m3db/m3/src/x/instrument"
xjson "github.com/m3db/m3/src/x/json"
xtest "github.com/m3db/m3/src/x/test"

"github.com/gogo/protobuf/proto"
Expand Down Expand Up @@ -71,13 +72,14 @@ func TestConfigGetBootstrappersHandler(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)

expectedResponse := `
{
"values": ["filesystem", "commitlog", "peers", "uninitialized_topology"]
expectedResp := xjson.Map{
"values": xjson.Array{"filesystem", "commitlog", "peers", "uninitialized_topology"},
}
`
assert.Equal(t, stripAllWhitespace(expectedResponse), string(body),
xtest.Diff(xtest.MustPrettyJSON(t, expectedResponse), xtest.MustPrettyJSON(t, string(body))))

expected := xtest.MustPrettyJSONMap(t, expectedResp)
actual := xtest.MustPrettyJSONString(t, string(body))

assert.Equal(t, expected, actual, xtest.Diff(expected, actual))
}

func TestConfigGetBootstrappersHandlerNotFound(t *testing.T) {
Expand Down
Loading

0 comments on commit eea6199

Please sign in to comment.