Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove binary scalar operator number selector caching #98

Merged
merged 3 commits into from
Oct 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 20 additions & 6 deletions engine/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -810,12 +810,19 @@ func TestQueriesAgainstOldEngine(t *testing.T) {
query: `scalar(max(http_requests_total))`,
},
{
name: "scalar func with number",
name: "scalar func with aggr and number on right",
load: `load 30s
http_requests_total{pod="nginx-1"} 1+1x15
http_requests_total{pod="nginx-2"} 1+2x18`,
query: `scalar(max(http_requests_total)) + 10`,
},
{
name: "scalar func with aggr and number on left",
load: `load 30s
http_requests_total{pod="nginx-1"} 1+1x15
http_requests_total{pod="nginx-2"} 1+2x18`,
query: `10 + scalar(max(http_requests_total))`,
},
{
name: "clamp",
load: `load 30s
Expand Down Expand Up @@ -1436,17 +1443,24 @@ func TestInstantQuery(t *testing.T) {
{
name: "scalar func with aggr",
load: `load 30s
http_requests_total{pod="nginx-1"} 1+1x15
http_requests_total{pod="nginx-2"} 1+2x18`,
http_requests_total{pod="nginx-1"} 1+1x15
http_requests_total{pod="nginx-2"} 1+2x18`,
query: `scalar(max(http_requests_total))`,
},
{
name: "scalar func with number",
name: "scalar func with aggr and number on right",
load: `load 30s
http_requests_total{pod="nginx-1"} 1+1x15
http_requests_total{pod="nginx-2"} 1+2x18`,
http_requests_total{pod="nginx-1"} 1+1x15
http_requests_total{pod="nginx-2"} 1+2x18`,
query: `scalar(max(http_requests_total)) + 10`,
},
{
name: "scalar func with aggr and number on left",
load: `load 30s
http_requests_total{pod="nginx-1"} 1+1x15
http_requests_total{pod="nginx-2"} 1+2x18`,
query: `10 + scalar(max(http_requests_total))`,
},
{
name: "clamp",
load: `load 30s
Expand Down
66 changes: 37 additions & 29 deletions execution/binary/scalar.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package binary
import (
"context"
"fmt"
"math"
"sync"

"github.com/prometheus/prometheus/model/labels"
Expand All @@ -26,21 +27,20 @@ const (
type scalarOperator struct {
seriesOnce sync.Once
series []labels.Labels
scalar float64

pool *model.VectorPool
numberSelector model.VectorOperator
next model.VectorOperator
getOperands getOperandsFunc
operandValIdx int
operation operation
opName string

pool *model.VectorPool
scalar model.VectorOperator
next model.VectorOperator
getOperands getOperandsFunc
operandValIdx int
operation operation
opName string
}

func NewScalar(
pool *model.VectorPool,
next model.VectorOperator,
numberSelector model.VectorOperator,
scalar model.VectorOperator,
op parser.ItemType,
scalarSide ScalarSide,
) (*scalarOperator, error) {
Expand All @@ -57,28 +57,19 @@ func NewScalar(
operandValIdx = 1
}

// Cache the result of the number selector since it
// will not change during execution.
v, err := numberSelector.Next(context.Background())
if err != nil {
return nil, err
}
scalar := v[0].Samples[0]

return &scalarOperator{
pool: pool,
next: next,
scalar: scalar,
numberSelector: numberSelector,
operation: binaryOperation,
opName: parser.ItemTypeStr[op],
getOperands: getOperands,
operandValIdx: operandValIdx,
pool: pool,
next: next,
scalar: scalar,
operation: binaryOperation,
opName: parser.ItemTypeStr[op],
getOperands: getOperands,
operandValIdx: operandValIdx,
}, nil
}

func (o *scalarOperator) Explain() (me string, next []model.VectorOperator) {
return fmt.Sprintf("[*scalarOperator] %v %s", o.scalar, o.opName), []model.VectorOperator{o.next}
return fmt.Sprintf("[*scalarOperator] %s", o.opName), []model.VectorOperator{o.next, o.scalar}
}

func (o *scalarOperator) Series(ctx context.Context) ([]labels.Labels, error) {
Expand All @@ -103,11 +94,21 @@ func (o *scalarOperator) Next(ctx context.Context) ([]model.StepVector, error) {
return nil, err
}

scalarIn, err := o.scalar.Next(ctx)
if err != nil {
return nil, err
}

out := o.pool.GetVectorBatch()
for _, vector := range in {
for v, vector := range in {
step := o.pool.GetStepVector(vector.T)
for i := range vector.Samples {
operands := o.getOperands(vector, i, o.scalar)
scalarVal := math.NaN()
fpetkovski marked this conversation as resolved.
Show resolved Hide resolved
if len(scalarIn) > v && len(scalarIn[v].Samples) > 0 {
scalarVal = scalarIn[v].Samples[0]
}

operands := o.getOperands(vector, i, scalarVal)
val, keep := o.operation(operands, o.operandValIdx)
if !keep {
continue
Expand All @@ -121,7 +122,14 @@ func (o *scalarOperator) Next(ctx context.Context) ([]model.StepVector, error) {
out = append(out, step)
o.next.GetPool().PutStepVector(vector)
}

for i := range scalarIn {
o.scalar.GetPool().PutStepVector(scalarIn[i])
}

o.next.GetPool().PutVectors(in)
o.scalar.GetPool().PutVectors(scalarIn)
saswatamcode marked this conversation as resolved.
Show resolved Hide resolved

return out, nil
}

Expand Down