Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[query] Refactor query code, add warnings to prom output #2265

Merged
merged 18 commits into from
Apr 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions src/cmd/services/m3query/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func (c QueryConfiguration) TimeoutOrDefault() time.Duration {
// instance. Limits are split between per-query and global limits.
type LimitsConfiguration struct {
// deprecated: use PerQuery.MaxComputedDatapoints instead.
DeprecatedMaxComputedDatapoints int64 `yaml:"maxComputedDatapoints"`
DeprecatedMaxComputedDatapoints int `yaml:"maxComputedDatapoints"`

// Global configures limits which apply across all queries running on this
// instance.
Expand All @@ -232,7 +232,7 @@ type LimitsConfiguration struct {
// LimitsConfiguration.PerQuery.PrivateMaxComputedDatapoints. See
// LimitsConfiguration.PerQuery.PrivateMaxComputedDatapoints for a comment on
// the semantics.
func (lc *LimitsConfiguration) MaxComputedDatapoints() int64 {
func (lc LimitsConfiguration) MaxComputedDatapoints() int {
if lc.PerQuery.PrivateMaxComputedDatapoints != 0 {
return lc.PerQuery.PrivateMaxComputedDatapoints
}
Expand All @@ -245,7 +245,7 @@ func (lc *LimitsConfiguration) MaxComputedDatapoints() int64 {
type GlobalLimitsConfiguration struct {
// MaxFetchedDatapoints limits the total number of datapoints actually
// fetched by all queries at any given time.
MaxFetchedDatapoints int64 `yaml:"maxFetchedDatapoints"`
MaxFetchedDatapoints int `yaml:"maxFetchedDatapoints"`
}

// AsLimitManagerOptions converts this configuration to
Expand All @@ -264,14 +264,14 @@ type PerQueryLimitsConfiguration struct {
// N.B.: the hacky "Private" prefix is to indicate that callers should use
// LimitsConfiguration.MaxComputedDatapoints() instead of accessing
// this field directly.
PrivateMaxComputedDatapoints int64 `yaml:"maxComputedDatapoints"`
PrivateMaxComputedDatapoints int `yaml:"maxComputedDatapoints"`

// MaxFetchedDatapoints limits the number of datapoints actually used by a
// given query.
MaxFetchedDatapoints int64 `yaml:"maxFetchedDatapoints"`
MaxFetchedDatapoints int `yaml:"maxFetchedDatapoints"`

// MaxFetchedSeries limits the number of time series returned by a storage node.
MaxFetchedSeries int64 `yaml:"maxFetchedSeries"`
MaxFetchedSeries int `yaml:"maxFetchedSeries"`
}

// AsLimitManagerOptions converts this configuration to
Expand All @@ -294,7 +294,7 @@ func (l *PerQueryLimitsConfiguration) AsFetchOptionsBuilderOptions() handleropti
}
}

func toLimitManagerOptions(limit int64) cost.LimitManagerOptions {
func toLimitManagerOptions(limit int) cost.LimitManagerOptions {
return cost.NewLimitManagerOptions().SetDefaultLimit(cost.Limit{
Threshold: cost.Cost(limit),
Enabled: limit > 0,
Expand Down
74 changes: 37 additions & 37 deletions src/cmd/services/m3query/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,36 +72,36 @@ func TestTagOptionsFromConfig(t *testing.T) {
assert.Equal(t, []byte(name), opts.MetricName())
}

func TestLimitsConfiguration_AsLimitManagerOptions(t *testing.T) {
func TestLimitsConfigurationAsLimitManagerOptions(t *testing.T) {
cases := []struct {
Input interface {
input interface {
AsLimitManagerOptions() cost.LimitManagerOptions
}
ExpectedDefault int64
expectedDefault int
}{{
Input: &PerQueryLimitsConfiguration{
input: &PerQueryLimitsConfiguration{
MaxFetchedDatapoints: 5,
},
ExpectedDefault: 5,
expectedDefault: 5,
}, {
Input: &GlobalLimitsConfiguration{
input: &GlobalLimitsConfiguration{
MaxFetchedDatapoints: 6,
},
ExpectedDefault: 6,
expectedDefault: 6,
}}

for _, tc := range cases {
t.Run(fmt.Sprintf("type_%T", tc.Input), func(t *testing.T) {
res := tc.Input.AsLimitManagerOptions()
t.Run(fmt.Sprintf("type_%T", tc.input), func(t *testing.T) {
res := tc.input.AsLimitManagerOptions()
assert.Equal(t, cost.Limit{
Threshold: cost.Cost(tc.ExpectedDefault),
Threshold: cost.Cost(tc.expectedDefault),
Enabled: true,
}, res.DefaultLimit())
})
}
}

func TestLimitsConfiguration_MaxComputedDatapoints(t *testing.T) {
func TestLimitsConfigurationMaxComputedDatapoints(t *testing.T) {
t.Run("uses PerQuery value if provided", func(t *testing.T) {
lc := &LimitsConfiguration{
DeprecatedMaxComputedDatapoints: 6,
Expand All @@ -110,49 +110,49 @@ func TestLimitsConfiguration_MaxComputedDatapoints(t *testing.T) {
},
}

assert.Equal(t, int64(5), lc.MaxComputedDatapoints())
assert.Equal(t, 5, lc.MaxComputedDatapoints())
})

t.Run("uses deprecated value if PerQuery not provided", func(t *testing.T) {
lc := &LimitsConfiguration{
DeprecatedMaxComputedDatapoints: 6,
}

assert.Equal(t, int64(6), lc.MaxComputedDatapoints())
assert.Equal(t, 6, lc.MaxComputedDatapoints())
})
}

func TestToLimitManagerOptions(t *testing.T) {
cases := []struct {
Name string
Input int64
ExpectedLimit cost.Limit
name string
input int
expectedLimit cost.Limit
}{{
Name: "negative is disabled",
Input: -5,
ExpectedLimit: cost.Limit{
name: "negative is disabled",
input: -5,
expectedLimit: cost.Limit{
Threshold: cost.Cost(-5),
Enabled: false,
},
}, {
Name: "zero is disabled",
Input: 0,
ExpectedLimit: cost.Limit{
name: "zero is disabled",
input: 0,
expectedLimit: cost.Limit{
Threshold: cost.Cost(0),
Enabled: false,
},
}, {
Name: "positive is enabled",
Input: 5,
ExpectedLimit: cost.Limit{
name: "positive is enabled",
input: 5,
expectedLimit: cost.Limit{
Threshold: cost.Cost(5),
Enabled: true,
},
}}

for _, tc := range cases {
t.Run(tc.Name, func(t *testing.T) {
assert.Equal(t, tc.ExpectedLimit, toLimitManagerOptions(tc.Input).DefaultLimit())
t.Run(tc.name, func(t *testing.T) {
assert.Equal(t, tc.expectedLimit, toLimitManagerOptions(tc.input).DefaultLimit())
})
}
}
Expand Down Expand Up @@ -185,25 +185,25 @@ func TestConfigValidation(t *testing.T) {

// limits configuration
limitsCfgCases := []struct {
Name string
Limit int64
name string
limit int
}{{
Name: "empty LimitsConfiguration is valid (implies disabled)",
Limit: 0,
name: "empty LimitsConfiguration is valid (implies disabled)",
limit: 0,
}, {
Name: "LimitsConfiguration with positive limit is valid",
Limit: 5,
name: "LimitsConfiguration with positive limit is valid",
limit: 5,
}, {}, {
Name: "LimitsConfiguration with negative limit is valid (implies disabled)",
Limit: -5,
name: "LimitsConfiguration with negative limit is valid (implies disabled)",
limit: -5,
}}

for _, tc := range limitsCfgCases {
t.Run(tc.Name, func(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
cfg := baseCfg(t)
cfg.Limits = LimitsConfiguration{
PerQuery: PerQueryLimitsConfiguration{
PrivateMaxComputedDatapoints: tc.Limit,
PrivateMaxComputedDatapoints: tc.limit,
}}

assert.NoError(t, validator.Validate(cfg))
Expand Down
106 changes: 89 additions & 17 deletions src/query/api/v1/handler/close.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,29 +28,101 @@ import (
"github.com/m3db/m3/src/x/instrument"
)

// CloseWatcher watches for CloseNotify and context timeout. It is best effort and may sometimes not close the channel relying on gc
// CancelWatcher is an interface that wraps a WatchForCancel method.
// TODO: make this generic middleware, rather than applied per function.
type CancelWatcher interface {
// WatchForCancel watches on the given context, and applies
// the given cancellation function.
WatchForCancel(context.Context, context.CancelFunc)
}

type canceller struct {
notifier http.CloseNotifier
iOpts instrument.Options
}

func (c *canceller) WatchForCancel(
ctx context.Context,
cancel context.CancelFunc,
) {
logger := logging.WithContext(ctx, c.iOpts)
notify := c.notifier.CloseNotify()
go func() {
// Wait for either the request to finish
// or for the client to disconnect
select {
case <-notify:
logger.Warn("connection closed by client")
cancel()
case <-ctx.Done():
// We only care about the time out case and not other cancellations
if ctx.Err() == context.DeadlineExceeded {
logger.Warn("request timed out")
}
}
}()
}

type ctxCanceller struct {
iOpts instrument.Options
}

func (c *ctxCanceller) WatchForCancel(
ctx context.Context, _ context.CancelFunc,
) {
logger := logging.WithContext(ctx, c.iOpts)
go func() {
select {
case <-ctx.Done():
// We only care about the time out case and not other cancellations
if ctx.Err() == context.DeadlineExceeded {
logger.Warn("request timed out")
}
}
}()
}

// NewResponseWriterCanceller creates a canceller on the given context with
// the given response writer.
func NewResponseWriterCanceller(
w http.ResponseWriter,
iOpts instrument.Options,
) CancelWatcher {
notifier, ok := w.(http.CloseNotifier)
if !ok {
return &ctxCanceller{iOpts: iOpts}
}

return &canceller{notifier: notifier, iOpts: iOpts}
}

// CloseWatcher watches for CloseNotify and context timeout.
// It is best effort and may sometimes not close the channel relying on GC.
func CloseWatcher(
ctx context.Context,
cancel context.CancelFunc,
w http.ResponseWriter,
instrumentOpts instrument.Options,
) {
notifier, ok := w.(http.CloseNotifier)
if !ok {
return
}

logger := logging.WithContext(ctx, instrumentOpts)
if notifier, ok := w.(http.CloseNotifier); ok {
notify := notifier.CloseNotify()
go func() {
// Wait for either the request to finish
// or for the client to disconnect
select {
case <-notify:
logger.Warn("connection closed by client")
cancel()
case <-ctx.Done():
// We only care about the time out case and not other cancellations
if ctx.Err() == context.DeadlineExceeded {
logger.Warn("request timed out")
}
notify := notifier.CloseNotify()
go func() {
// Wait for either the request to finish
// or for the client to disconnect
select {
case <-notify:
logger.Warn("connection closed by client")
cancel()
case <-ctx.Done():
// We only care about the time out case and not other cancellations
if ctx.Err() == context.DeadlineExceeded {
logger.Warn("request timed out")
}
}()
}
}
}()
}
14 changes: 12 additions & 2 deletions src/query/api/v1/handler/close_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,17 @@ func TestCloseWatcher(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
w := httptest.NewRecorder()
CloseWatcher(ctx, cancel, w, instrument.NewOptions())
assert.Nil(t, ctx.Err())
assert.NoError(t, ctx.Err())
time.Sleep(100 * time.Millisecond)
assert.NotNil(t, ctx.Err())
assert.Error(t, ctx.Err())
}

func TestResponseWriteCanceller(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
w := httptest.NewRecorder()
canceller := NewResponseWriterCanceller(w, instrument.NewOptions())
canceller.WatchForCancel(ctx, cancel)
assert.NoError(t, ctx.Err())
time.Sleep(100 * time.Millisecond)
assert.Error(t, ctx.Err())
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/m3db/m3/src/cluster/kv"
"github.com/m3db/m3/src/dbnode/kvconfig"
"github.com/m3db/m3/src/x/instrument"
xjson "github.com/m3db/m3/src/x/json"
xtest "github.com/m3db/m3/src/x/test"

"github.com/gogo/protobuf/proto"
Expand Down Expand Up @@ -71,13 +72,14 @@ func TestConfigGetBootstrappersHandler(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)

expectedResponse := `
{
"values": ["filesystem", "commitlog", "peers", "uninitialized_topology"]
expectedResp := xjson.Map{
"values": xjson.Array{"filesystem", "commitlog", "peers", "uninitialized_topology"},
}
`
assert.Equal(t, stripAllWhitespace(expectedResponse), string(body),
xtest.Diff(xtest.MustPrettyJSON(t, expectedResponse), xtest.MustPrettyJSON(t, string(body))))

expected := xtest.MustPrettyJSONMap(t, expectedResp)
actual := xtest.MustPrettyJSONString(t, string(body))

assert.Equal(t, expected, actual, xtest.Diff(expected, actual))
}

func TestConfigGetBootstrappersHandlerNotFound(t *testing.T) {
Expand Down
Loading