Skip to content

Commit

Permalink
Merge branch 'master' into r/add-logging-for-max-query-ids
Browse files Browse the repository at this point in the history
  • Loading branch information
robskillington authored Aug 2, 2021
2 parents 61a06d0 + eb72a77 commit dec2d74
Show file tree
Hide file tree
Showing 19 changed files with 292 additions and 61 deletions.
5 changes: 5 additions & 0 deletions docker/m3coordinator/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ RUN cd /go/src/github.com/m3db/m3/ && \
FROM alpine:3.11
LABEL maintainer="The M3DB Authors <[email protected]>"

# Provide timezone data to allow TZ environment variable to be set
# for parsing relative times such as "9am" correctly and respect
# the TZ environment variable.
RUN apk add --no-cache tzdata

EXPOSE 7201/tcp 7203/tcp

COPY --from=builder /go/src/github.com/m3db/m3/bin/m3coordinator /bin/
Expand Down
5 changes: 5 additions & 0 deletions docker/m3coordinator/development.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@ LABEL maintainer="The M3DB Authors <[email protected]>"

EXPOSE 7201/tcp 7203/tcp

# Provide timezone data to allow TZ environment variable to be set
# for parsing relative times such as "9am" correctly and respect
# the TZ environment variable.
RUN apk add --no-cache tzdata

ADD ./m3coordinator /bin/m3coordinator
ADD ./config/m3coordinator-local-etcd.yml /etc/m3coordinator/m3coordinator.yml

Expand Down
5 changes: 4 additions & 1 deletion docker/m3dbnode/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ ENV GODEBUG madvdontneed=1

EXPOSE 2379/tcp 2380/tcp 7201/tcp 7203/tcp 9000-9004/tcp

RUN apk add --no-cache curl jq
# Provide timezone data to allow TZ environment variable to be set
# for parsing relative times such as "9am" correctly and respect
# the TZ environment variable.
RUN apk add --no-cache tzdata curl jq

COPY --from=builder /go/src/github.com/m3db/m3/src/dbnode/config/m3dbnode-local-etcd.yml /etc/m3dbnode/m3dbnode.yml
COPY --from=builder /go/src/github.com/m3db/m3/bin/m3dbnode \
Expand Down
5 changes: 4 additions & 1 deletion docker/m3dbnode/Dockerfile-setcap
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ COPY --from=builder /go/src/github.com/m3db/m3/bin/m3dbnode \

# Use setcap to set +e "effective" and +p "permitted" to adjust the SYS_RESOURCE
# so the process can raise the hard file limit with setrlimit.
RUN apk add --no-cache curl jq libcap && \
# Also provide timezone data to allow TZ environment variable to be set
# for parsing relative times such as "9am" correctly and respect
# the TZ environment variable.
RUN apk add --no-cache tzdata curl jq libcap && \
setcap cap_sys_resource=+ep /bin/m3dbnode

ENV GODEBUG madvdontneed=1
Expand Down
5 changes: 4 additions & 1 deletion docker/m3dbnode/development.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ LABEL maintainer="The M3DB Authors <[email protected]>"

ENV GODEBUG madvdontneed=1

RUN apk add --no-cache curl jq
# Provide timezone data to allow TZ environment variable to be set
# for parsing relative times such as "9am" correctly and respect
# the TZ environment variable.
RUN apk add --no-cache tzdata curl jq

# Add m3dbnode binary
ADD ./m3dbnode /bin/m3dbnode
Expand Down
5 changes: 5 additions & 0 deletions docker/m3query/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ LABEL maintainer="The M3DB Authors <[email protected]>"

EXPOSE 7201/tcp 7203/tcp

# Provide timezone data to allow TZ environment variable to be set
# for parsing relative times such as "9am" correctly and respect
# the TZ environment variable.
RUN apk add --no-cache tzdata

COPY --from=builder /go/src/github.com/m3db/m3/bin/m3query /bin/
COPY --from=builder /go/src/github.com/m3db/m3/src/query/config/m3query-local-etcd.yml /etc/m3query/m3query.yml

Expand Down
5 changes: 5 additions & 0 deletions docker/m3query/development.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@ LABEL maintainer="The M3DB Authors <[email protected]>"

EXPOSE 7201/tcp 7203/tcp

# Provide timezone data to allow TZ environment variable to be set
# for parsing relative times such as "9am" correctly and respect
# the TZ environment variable.
RUN apk add --no-cache tzdata

ADD ./m3query /bin/m3query
ADD ./config/m3query-local-etcd.yml /etc/m3query/m3query.yml

Expand Down
2 changes: 2 additions & 0 deletions src/query/api/v1/httpd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,8 +570,10 @@ func (h *Handler) registerHealthEndpoints() error {
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
json.NewEncoder(w).Encode(struct {
Uptime string `json:"uptime"`
Now string `json:"now"`
}{
Uptime: time.Since(h.options.CreatedAt()).String(),
Now: time.Now().String(),
})
}),
Methods: methods(http.MethodGet),
Expand Down
2 changes: 1 addition & 1 deletion src/query/graphite/graphite/timespec.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func ParseTime(s string, now time.Time, absoluteOffset time.Duration) (time.Time

n, err := strconv.ParseInt(s, 10, 64)
if err == nil {
return time.Unix(n, 0).UTC(), nil
return time.Unix(n, 0), nil
}

s = strings.Replace(strings.Replace(strings.ToLower(s), ",", "", -1), " ", "", -1)
Expand Down
11 changes: 7 additions & 4 deletions src/query/graphite/graphite/timespec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,13 @@ func TestParseTime(t *testing.T) {
}

for _, test := range tests {
s := test.timespec
parsed, err := ParseTime(s, relativeTo, 0)
assert.Nil(t, err, "error parsing %s", s)
assert.Equal(t, test.expectedTime, parsed, "incorrect parsed value for %s", s)
test := test
t.Run(test.timespec, func(t *testing.T) {
s := test.timespec
parsed, err := ParseTime(s, relativeTo, 0)
assert.Nil(t, err, "error parsing %s", s)
assert.True(t, test.expectedTime.Equal(parsed), "incorrect parsed value for %s", s)
})
}
}

Expand Down
16 changes: 16 additions & 0 deletions src/query/graphite/native/aggregation_functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,19 @@ func divideSeriesLists(ctx *common.Context, dividendSeriesList, divisorSeriesLis
"divideSeriesLists both SeriesLists must have exactly the same length"))
return ts.NewSeriesList(), err
}

// If either list is not sorted yet then apply a default sort for deterministic results.
if !dividendSeriesList.SortApplied {
// Use sort.Stable for deterministic output.
sort.Stable(ts.SeriesByName(dividendSeriesList.Values))
dividendSeriesList.SortApplied = true
}
if !divisorSeriesList.SortApplied {
// Use sort.Stable for deterministic output.
sort.Stable(ts.SeriesByName(divisorSeriesList.Values))
divisorSeriesList.SortApplied = true
}

results := make([]*ts.Series, len(dividendSeriesList.Values))
for idx, dividendSeries := range dividendSeriesList.Values {
divisorSeries := divisorSeriesList.Values[idx]
Expand All @@ -273,6 +286,8 @@ func divideSeriesLists(ctx *common.Context, dividendSeriesList, divisorSeriesLis
}

r := ts.SeriesList(dividendSeriesList)
// Set sorted as we sorted any input that wasn't already sorted.
r.SortApplied = true
r.Values = results
return r, nil
}
Expand Down Expand Up @@ -558,6 +573,7 @@ func applyByNode(ctx *common.Context, seriesList singlePathSpec, nodeNum int, te
}

for _, prefix := range prefixChunk {
prefix := prefix // Capture for lambda.
newTarget := strings.ReplaceAll(templateFunction, "%", prefix)
wg.Add(1)
go func() {
Expand Down
50 changes: 50 additions & 0 deletions src/query/graphite/native/aggregation_functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,56 @@ func TestDivideSeriesLists(t *testing.T) {
require.Error(t, err)
}

// TestDivideSeriesListsWithUnsortedSeriesInput ensures that if input into
// the function wasn't sorted as input that it becomes sorted before dividing
// two series lists (to ensure deterministic results).
func TestDivideSeriesListsWithUnsortedSeriesInput(t *testing.T) {
start := time.Now().Truncate(time.Minute).Add(-10 * time.Minute)
end := start.Add(5 * time.Minute)
ctx := common.NewContext(common.ContextOptions{Start: start, End: end})

dividend := []*ts.Series{
ts.NewSeries(ctx, "a", start,
ts.NewConstantValues(ctx, 1, 5, 60000)),
ts.NewSeries(ctx, "c", start,
ts.NewConstantValues(ctx, 3, 5, 60000)),
ts.NewSeries(ctx, "b", start,
ts.NewConstantValues(ctx, 2, 5, 60000)),
}

divisor := []*ts.Series{
ts.NewSeries(ctx, "b", start,
ts.NewConstantValues(ctx, 2, 5, 60000)),
ts.NewSeries(ctx, "a", start,
ts.NewConstantValues(ctx, 1, 5, 60000)),
ts.NewSeries(ctx, "d", start,
ts.NewConstantValues(ctx, 3, 5, 60000)),
}

actual, err := divideSeriesLists(ctx, singlePathSpec{
Values: dividend,
}, singlePathSpec{
Values: divisor,
})
require.Nil(t, err)
expected := []common.TestSeries{
{
Name: "divideSeries(a,a)",
Data: []float64{1, 1, 1, 1, 1},
},
{
Name: "divideSeries(b,b)",
Data: []float64{1, 1, 1, 1, 1},
},
{
Name: "divideSeries(c,d)",
Data: []float64{1, 1, 1, 1, 1},
},
}

common.CompareOutputsAndExpected(t, 60000, start, expected, actual.Values)
}

//nolint:govet
func TestAverageSeriesWithWildcards(t *testing.T) {
ctx, _ := newConsolidationTestSeries()
Expand Down
35 changes: 32 additions & 3 deletions src/query/graphite/native/alias_functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,14 @@ import (
"testing"
"time"

"github.com/m3db/m3/src/query/graphite/common"
"github.com/m3db/m3/src/query/graphite/ts"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/m3db/m3/src/query/graphite/common"
"github.com/m3db/m3/src/query/graphite/storage"
"github.com/m3db/m3/src/query/graphite/ts"
xgomock "github.com/m3db/m3/src/x/test"
)

func TestAlias(t *testing.T) {
Expand Down Expand Up @@ -250,3 +253,29 @@ func TestAliasByNodeWitCallSubExpressions(t *testing.T) {
assert.Equal(t, "foo01", results.Values[0].Name())
assert.Equal(t, "foo02", results.Values[1].Name())
}

// TestExecuteAliasByNodeAndTimeShift tests that the output of timeshift properly
// quotes the time shift arg so that it appears as a string and can be used to find
// the inner path expression without failing compilation when aliasByNode finds the
// first path element.
func TestExecuteAliasByNodeAndTimeShift(t *testing.T) {
ctrl := xgomock.NewController(t)
defer ctrl.Finish()

store := storage.NewMockStorage(ctrl)

engine := NewEngine(store, CompileOptions{})

ctx := common.NewContext(common.ContextOptions{Start: time.Now().Add(-1 * time.Hour), End: time.Now(), Engine: engine})

stepSize := int((10 * time.Minute) / time.Millisecond)
store.EXPECT().FetchByQuery(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
buildTestSeriesFn(stepSize, "foo.bar.q.zed", "foo.bar.g.zed",
"foo.bar.x.zed"))

expr, err := engine.Compile("aliasByNode(timeShift(foo.bar.*.zed,'-7d'), 0)")
require.NoError(t, err)

_, err = expr.Execute(ctx)
require.NoError(t, err)
}
35 changes: 31 additions & 4 deletions src/query/graphite/native/builtin_functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,13 @@ func averageAbove(ctx *common.Context, series singlePathSpec, n float64) (ts.Ser
return aboveByFunction(ctx, series, sr, n)
}

// averageBelow takes one metric or a wildcard seriesList followed by an floating point number n,
// returns only the metrics with an average value below n.
func averageBelow(ctx *common.Context, series singlePathSpec, n float64) (ts.SeriesList, error) {
sr := ts.SeriesReducerAvg.Reducer()
return belowByFunction(ctx, series, sr, n)
}

// currentAbove takes one metric or a wildcard seriesList followed by an floating point number n,
// returns only the metrics with the last value above n.
func currentAbove(ctx *common.Context, series singlePathSpec, n float64) (ts.SeriesList, error) {
Expand Down Expand Up @@ -375,7 +382,7 @@ func timeShift(
output := make([]*ts.Series, input.Len())
for i, in := range input.Values {
// NB(jayp): opposite direction
output[i] = in.Shift(-1 * shift).RenamedTo(fmt.Sprintf("timeShift(%s, %s)", in.Name(), timeShiftS))
output[i] = in.Shift(-1 * shift).RenamedTo(fmt.Sprintf("timeShift(%s,%q)", in.Name(), timeShiftS))
}
input.Values = output
return input, nil
Expand Down Expand Up @@ -455,7 +462,7 @@ func timeSlice(ctx *common.Context, inputPath singlePathSpec, start string, end
}

slicedSeries := ts.NewSeries(ctx, series.Name(), series.StartTime(), truncatedValues)
renamedSlicedSeries := slicedSeries.RenamedTo(fmt.Sprintf("timeSlice(%s, %s, %s)", slicedSeries.Name(), start, end))
renamedSlicedSeries := slicedSeries.RenamedTo(fmt.Sprintf("timeSlice(%s, %q, %q)", slicedSeries.Name(), start, end))
output = append(output, renamedSlicedSeries)
}
input.Values = output
Expand Down Expand Up @@ -1258,22 +1265,40 @@ func exclude(_ *common.Context, input singlePathSpec, pattern string) (ts.Series
// and raises the datapoint by the power of the constant provided at each point
// nolint: gocritic
func pow(ctx *common.Context, input singlePathSpec, factor float64) (ts.SeriesList, error) {
r := powHelper(ctx, input, factor, false)
return r, nil
}

// invert takes one metric or a wildcard seriesList, and inverts each datapoint (i.e. 1/x).
func invert(ctx *common.Context, input singlePathSpec) (ts.SeriesList, error) {
r := powHelper(ctx, input, -1, true)
return r, nil
}

func powHelper(ctx *common.Context, input singlePathSpec, factor float64, isInvert bool) ts.SeriesList {
results := make([]*ts.Series, 0, len(input.Values))

renamePrefix := "pow"
if isInvert {
renamePrefix = "invert"
}
for _, series := range input.Values {
numSteps := series.Len()
millisPerStep := series.MillisPerStep()
vals := ts.NewValues(ctx, millisPerStep, numSteps)
for i := 0; i < numSteps; i++ {
vals.SetValueAt(i, math.Pow(series.ValueAt(i), factor))
}
newName := fmt.Sprintf("pow(%s, %f)", series.Name(), factor)
newName := fmt.Sprintf("%s(%s, %f)", renamePrefix, series.Name(), factor)
if isInvert {
newName = fmt.Sprintf("%s(%s)", renamePrefix, series.Name())
}
results = append(results, ts.NewSeries(ctx, newName, series.StartTime(), vals))
}

r := ts.SeriesList(input)
r.Values = results
return r, nil
return r
}

// logarithm takes one metric or a wildcard seriesList, and draws the y-axis in
Expand Down Expand Up @@ -2790,6 +2815,7 @@ func init() {
2: nil, // total
})
MustRegisterFunction(averageAbove)
MustRegisterFunction(averageBelow)
MustRegisterFunction(averageSeries)
MustRegisterFunction(averageSeriesWithWildcards).WithDefaultParams(map[uint8]interface{}{
2: -1, // positions
Expand Down Expand Up @@ -2838,6 +2864,7 @@ func init() {
MustRegisterFunction(interpolate).WithDefaultParams(map[uint8]interface{}{
2: -1, // limit
})
MustRegisterFunction(invert)
MustRegisterFunction(isNonNull)
MustRegisterFunction(keepLastValue).WithDefaultParams(map[uint8]interface{}{
2: -1, // limit
Expand Down
Loading

0 comments on commit dec2d74

Please sign in to comment.