Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[query] Fix time() function in binary comparisons #1888

Merged
merged 6 commits into from
Aug 22, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 17 additions & 24 deletions src/query/block/scalar.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,68 +28,63 @@ import (
)

// Scalar is a block containing a single value over a certain bound
// This represents constant values; it greatly simplifies downstream operations by
// allowing them to treat this as a regular block, while at the same time
// having an option to optimize by accessing the scalar value directly instead
// This represents constant values; it greatly simplifies downstream operations
// by allowing them to treat this as a regular block, while at the same time
// having an option to optimize by accessing the scalar value directly instead.
type Scalar struct {
s ScalarFunc
val float64
meta Metadata
}

// NewScalar creates a scalar block containing val over the bounds
func NewScalar(
s ScalarFunc,
val float64,
bounds models.Bounds,
tagOptions models.TagOptions,
) Block {
return &Scalar{
s: s,
val: val,
meta: Metadata{
Bounds: bounds,
Tags: models.NewTags(0, tagOptions),
},
}
}

// Unconsolidated returns the unconsolidated version for the block
func (b *Scalar) Unconsolidated() (UnconsolidatedBlock, error) {
return nil, fmt.Errorf("unconsolidated view not implemented for scalar block, meta: %s", b.meta)
return nil,
fmt.Errorf("unconsolidated view not implemented for scalar block, meta: %s",
b.meta)
}

// WithMetadata updates this blocks metadata, and the metadatas for each series.
func (b *Scalar) WithMetadata(
meta Metadata,
_ []SeriesMeta,
) (Block, error) {
return &Scalar{
meta: meta,
s: b.s,
val: b.val,
}, nil
}

// StepIter returns a StepIterator
func (b *Scalar) StepIter() (StepIter, error) {
bounds := b.meta.Bounds
steps := bounds.Steps()
return &scalarStepIter{
meta: b.meta,
s: b.s,
vals: []float64{b.val},
numVals: steps,
idx: -1,
}, nil
}

// ScalarFunc determines the function to apply to generate the value at each step
type ScalarFunc func(t time.Time) float64

// SeriesIter returns a SeriesIterator
func (b *Scalar) SeriesIter() (SeriesIter, error) {
bounds := b.meta.Bounds
steps := bounds.Steps()
vals := make([]float64, steps)
t := bounds.Start
for i := range vals {
vals[i] = b.s(t)
vals[i] = b.val
t = t.Add(bounds.StepSize)
}

Expand All @@ -100,23 +95,21 @@ func (b *Scalar) SeriesIter() (SeriesIter, error) {
}, nil
}

// Close closes the scalar block
func (b *Scalar) Close() error { return nil }

// Value returns the value for the scalar block
func (b *Scalar) Value(t time.Time) float64 {
return b.s(t)
func (b *Scalar) Value() float64 {
return b.val
}

type scalarStepIter struct {
numVals, idx int
stepTime time.Time
err error
meta Metadata
s ScalarFunc
vals []float64
}

// build an empty SeriesMeta
// build an empty SeriesMetadata.
func buildSeriesMeta(meta Metadata) SeriesMeta {
return SeriesMeta{
Tags: models.NewTags(0, meta.Tags.Opts),
Expand Down Expand Up @@ -153,7 +146,7 @@ func (it *scalarStepIter) Next() bool {
func (it *scalarStepIter) Current() Step {
t := it.stepTime
return &scalarStep{
vals: []float64{it.s(t)},
vals: it.vals,
time: t,
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/query/block/scalar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ var (

func TestScalarBlock(t *testing.T) {
block := NewScalar(
func(_ time.Time) float64 { return val },
val,
bounds,
models.NewTagOptions(),
)
Expand Down
2 changes: 1 addition & 1 deletion src/query/functions/binary/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func ArithmeticFunction(opType string, returnBool bool) (Function, error) {
return fn, nil
}

return nil, errNoMatching
return nil, fmt.Errorf("no arithmetic function found for type: %s", opType)
}

// NewOp creates a new binary operation.
Expand Down
11 changes: 3 additions & 8 deletions src/query/functions/binary/binary.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
package binary

import (
"time"

"github.com/m3db/m3/src/query/block"
"github.com/m3db/m3/src/query/executor/transform"
"github.com/m3db/m3/src/query/functions/utils"
Expand Down Expand Up @@ -53,8 +51,7 @@ func processBinary(
return nil, errLeftScalar
}

lVal := scalarL.Value(time.Time{})

lVal := scalarL.Value()
// rhs is a series; use rhs metadata and series meta
if !params.RIsScalar {
return processSingleBlock(
Expand Down Expand Up @@ -82,9 +79,7 @@ func processBinary(
}

return block.NewScalar(
func(t time.Time) float64 {
return fn(lVal, scalarR.Value(t))
},
fn(lVal, scalarR.Value()),
lIter.Meta().Bounds,
lIter.Meta().Tags.Opts,
), nil
Expand All @@ -96,7 +91,7 @@ func processBinary(
return nil, errRightScalar
}

rVal := scalarR.Value(time.Time{})
rVal := scalarR.Value()
// lhs is a series; use lhs metadata and series meta.
return processSingleBlock(
queryCtx,
Expand Down
48 changes: 8 additions & 40 deletions src/query/functions/binary/binary_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,22 +119,14 @@ func TestScalars(t *testing.T) {
err = node.Process(
models.NoopQueryContext(),
parser.NodeID(0),
block.NewScalar(
func(_ time.Time) float64 { return tt.lVal },
bounds,
models.NewTagOptions(),
),
block.NewScalar(tt.lVal, bounds, models.NewTagOptions()),
)

require.NoError(t, err)
err = node.Process(
models.NoopQueryContext(),
parser.NodeID(1),
block.NewScalar(
func(_ time.Time) float64 { return tt.rVal },
bounds,
models.NewTagOptions(),
),
block.NewScalar(tt.rVal, bounds, models.NewTagOptions()),
)

expected := [][]float64{{
Expand Down Expand Up @@ -178,22 +170,14 @@ func TestScalarsReturnBoolFalse(t *testing.T) {
err = node.Process(
models.NoopQueryContext(),
parser.NodeID(0),
block.NewScalar(
func(_ time.Time) float64 { return tt.lVal },
bounds,
models.NewTagOptions(),
),
block.NewScalar(tt.lVal, bounds, models.NewTagOptions()),
)

require.NoError(t, err)
err = node.Process(
models.NoopQueryContext(),
parser.NodeID(1),
block.NewScalar(
func(_ time.Time) float64 { return tt.rVal },
bounds,
models.NewTagOptions(),
),
block.NewScalar(tt.rVal, bounds, models.NewTagOptions()),
)

if tt.opType == EqType || tt.opType == NotEqType ||
Expand Down Expand Up @@ -584,23 +568,15 @@ func TestSingleSeriesReturnBool(t *testing.T) {
err = node.Process(
models.NoopQueryContext(),
parser.NodeID(1),
block.NewScalar(
func(_ time.Time) float64 { return tt.scalarVal },
bounds,
models.NewTagOptions(),
),
block.NewScalar(tt.scalarVal, bounds, models.NewTagOptions()),
)

require.NoError(t, err)
} else {
err = node.Process(
models.NoopQueryContext(),
parser.NodeID(0),
block.NewScalar(
func(_ time.Time) float64 { return tt.scalarVal },
bounds,
models.NewTagOptions(),
),
block.NewScalar(tt.scalarVal, bounds, models.NewTagOptions()),
)

require.NoError(t, err)
Expand Down Expand Up @@ -656,23 +632,15 @@ func TestSingleSeriesReturnValues(t *testing.T) {
err = node.Process(
models.NoopQueryContext(),
parser.NodeID(1),
block.NewScalar(
func(_ time.Time) float64 { return tt.scalarVal },
bounds,
models.NewTagOptions(),
),
block.NewScalar(tt.scalarVal, bounds, models.NewTagOptions()),
)

require.NoError(t, err)
} else {
err = node.Process(
models.NoopQueryContext(),
parser.NodeID(0),
block.NewScalar(
func(_ time.Time) float64 { return tt.scalarVal },
bounds,
models.NewTagOptions(),
),
block.NewScalar(tt.scalarVal, bounds, models.NewTagOptions()),
)

require.NoError(t, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
package scalar

import (
"fmt"

"github.com/m3db/m3/src/query/block"
"github.com/m3db/m3/src/query/executor/transform"
"github.com/m3db/m3/src/query/models"
Expand All @@ -46,60 +44,52 @@ const (
TimeType = "time"
)

type baseOp struct {
fn block.ScalarFunc
tagOptions models.TagOptions
operatorType string
type scalarOp struct {
val float64
tagOptions models.TagOptions
}

func (o baseOp) OpType() string {
return o.operatorType
func (o scalarOp) OpType() string {
return ScalarType
}

func (o baseOp) String() string {
return fmt.Sprintf("type: %s.", o.OpType())
func (o scalarOp) String() string {
return "type: scalar"
}

func (o baseOp) Node(
func (o scalarOp) Node(
controller *transform.Controller,
opts transform.Options,
) parser.Source {
return &baseNode{
return &scalarNode{
op: o,
controller: controller,
opts: opts,
}
}

// NewScalarOp creates a new scalar op
// NewScalarOp creates an operation that yields a scalar source.
func NewScalarOp(
fn block.ScalarFunc,
opType string,
val float64,
tagOptions models.TagOptions,
) (parser.Params, error) {
if opType != ScalarType && opType != TimeType {
return nil, fmt.Errorf("unknown scalar type: %s", opType)
}

return &baseOp{
fn: fn,
tagOptions: tagOptions,
operatorType: opType,
return &scalarOp{
val: val,
tagOptions: tagOptions,
}, nil
}

// scalarNode is the execution node
type baseNode struct {
op baseOp
// scalarNode is the execution node for time source.
type scalarNode struct {
op scalarOp
controller *transform.Controller
opts transform.Options
}

// Execute runs the scalar node operation
func (n *baseNode) Execute(queryCtx *models.QueryContext) error {
// Execute runs the scalar source's pipeline.
func (n *scalarNode) Execute(queryCtx *models.QueryContext) error {
bounds := n.opts.TimeSpec().Bounds()

block := block.NewScalar(n.op.fn, bounds, n.op.tagOptions)
block := block.NewScalar(n.op.val, bounds, n.op.tagOptions)
if n.opts.Debug() {
// Ignore any errors
iter, _ := block.StepIter()
Expand Down
Loading