Skip to content

Commit

Permalink
feat(stdlib): add vector float builtin (#5015)
Browse files Browse the repository at this point in the history
* feat(stdlib): initial built in impl

* fix(stdlib): remove array cases

* refactor(array): move conversion handling to array

* fix(stdlib): update _vectorizedFloat builtin param type

* fix(libflux): update build gen

* fix(stdlib): change v to vector

* fix(libflux): update build gen

* fix(interpreter): update window source location

* feat(stdlib): conv vec repeats to float

* fix(ast): handle VectorType marshaling

* fix: revert nil guard in NewVectorFromElements

Originally the thought was to pass nils along as-is but receivers
downstream expect to interrogate the values as a `Value`, so nils can
cause panics unless wrapped.

* refactor: tighten up VectorRepeatValue handling

* refactor: avoid building array if not needed

In the vectorized ToFloatConv a base case is when it receives an array
of floats (no conversion is needed). In this situation we can return the
input as-is.

Since the rest of the inputs will return a brand new arrays, the
expectation seems to be the caller would release the input. To mimic
this behavor for the reuse case, we tick up the refcount by calling
`.Retain()` before handing the array back to the caller.

Co-authored-by: Owen Nelson <[email protected]>
  • Loading branch information
OfTheDelmer and Owen Nelson authored Aug 12, 2022
1 parent def15ad commit acb06ec
Show file tree
Hide file tree
Showing 9 changed files with 390 additions and 35 deletions.
74 changes: 74 additions & 0 deletions array/array.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package array

import (
"strconv"

"github.com/apache/arrow/go/v7/arrow"
"github.com/apache/arrow/go/v7/arrow/array"
"github.com/influxdata/flux/codes"
"github.com/influxdata/flux/internal/errors"
"github.com/influxdata/flux/memory"
)

//go:generate -command tmpl ../gotool.sh github.com/benbjohnson/tmpl
Expand Down Expand Up @@ -206,3 +209,74 @@ func Slice(arr Array, i, j int) Array {
err := errors.Newf(codes.Internal, "cannot slice array of type %T", arr)
panic(err)
}

func ToFloatConv(mem memory.Allocator, arr Array) (*Float, error) {

// Skip building a new array if the incoming array is already floats
if fa, ok := arr.(*Float); ok {
// For any other input type case, we create a brand new array.
// This implies the caller is responsible for releasing the input array.
// Tick up the refcount before handing the array right back to the caller
// to avoid a use-after-free situation.
fa.Retain()
return fa, nil
}

conv := NewFloatBuilder(mem)
defer conv.Release()

size := arr.Len()
conv.Resize(size)

// n.b. we handle the arrow.FLOAT64 case at the top of this func so we don't
// have to handle it here in this switch.
switch arr.DataType().ID() {
case arrow.STRING:
vec := arr.(*String)
for i := 0; i < size; i++ {
if vec.IsNull(i) {
conv.AppendNull()
continue
}

val, err := strconv.ParseFloat(vec.Value(i), 64)
if err != nil {
return nil, errors.Newf(codes.Invalid, "cannot convert string %q to Float due to invalid syntax", vec.Value(i))
}
conv.Append(val)
}
case arrow.INT64:
vec := arr.(*Int)
for i := 0; i < size; i++ {
if vec.IsNull(i) {
conv.AppendNull()
} else {
conv.Append(float64(vec.Value(i)))
}
}
case arrow.UINT64:
vec := arr.(*Uint)
for i := 0; i < size; i++ {
if vec.IsNull(i) {
conv.AppendNull()
} else {
conv.Append(float64(vec.Value(i)))
}
}
case arrow.BOOL:
vec := arr.(*Boolean)
for i := 0; i < size; i++ {
if vec.IsNull(i) {
conv.AppendNull()
} else if vec.Value(i) {
conv.Append(float64(1))
} else {
conv.Append(float64(0))
}
}
default:
return nil, errors.Newf(codes.Invalid, "cannot convert %v to Float", arr.DataType().Name())
}

return conv.NewFloatArray(), nil
}
23 changes: 23 additions & 0 deletions ast/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ func (*UnsignedIntegerLiteral) node() {}
func (*NamedType) node() {}
func (*TvarType) node() {}
func (*ArrayType) node() {}
func (*VectorType) node() {}
func (*StreamType) node() {}
func (*DictType) node() {}
func (*RecordType) node() {}
Expand Down Expand Up @@ -246,6 +247,7 @@ func (StreamType) monotype() {}
func (DictType) monotype() {}
func (RecordType) monotype() {}
func (FunctionType) monotype() {}
func (VectorType) monotype() {}

type NamedType struct {
BaseNode
Expand Down Expand Up @@ -307,6 +309,27 @@ func (c *ArrayType) Copy() Node {
return nc
}

type VectorType struct {
BaseNode
ElementType MonoType `json:"element"`
}

func (VectorType) Type() string {
return "VectorType"
}

func (v *VectorType) Copy() Node {
if v == nil {
return v
}
nc := new(VectorType)
*nc = *v
nc.BaseNode = v.BaseNode.Copy()

nc.ElementType = v.ElementType.Copy().(*VectorType)
return nc
}

type StreamType struct {
BaseNode
ElementType MonoType `json:"element"`
Expand Down
1 change: 1 addition & 0 deletions ast/asttest/cmpopts.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,5 @@ var IgnoreBaseNodeOptions = []cmp.Option{
cmpopts.IgnoreFields(ast.UnaryExpression{}, "BaseNode"),
cmpopts.IgnoreFields(ast.UnsignedIntegerLiteral{}, "BaseNode"),
cmpopts.IgnoreFields(ast.VariableAssignment{}, "BaseNode"),
cmpopts.IgnoreFields(ast.VectorType{}, "BaseNode"),
}
32 changes: 32 additions & 0 deletions ast/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -1208,6 +1208,36 @@ func (nt *ArrayType) UnmarshalJSON(data []byte) error {
nt.ElementType = et
return nil
}
func (vt VectorType) MarshalJSON() ([]byte, error) {
type Alias VectorType
raw := struct {
Type string `json:"type"`
Alias
}{
Type: vt.Type(),
Alias: (Alias)(vt),
}
return json.Marshal(raw)
}
func (vt *VectorType) UnmarshalJSON(data []byte) error {
type Alias VectorType
raw := struct {
*Alias
ElementType json.RawMessage `json:"element"`
}{}
if err := json.Unmarshal(data, &raw); err != nil {
return err
}
if raw.Alias != nil {
*vt = *(*VectorType)(raw.Alias)
}
et, err := unmarshalMonotype(raw.ElementType)
if err != nil {
return err
}
vt.ElementType = et
return nil
}

func (arr StreamType) MarshalJSON() ([]byte, error) {
type Alias StreamType
Expand Down Expand Up @@ -1615,6 +1645,8 @@ func unmarshalNode(msg json.RawMessage) (Node, error) {
node = new(ReturnStatement)
case "VariableAssignment":
node = new(VariableAssignment)
case "VectorType":
node = new(VectorType)
case "MemberAssignment":
node = new(MemberAssignment)
case "CallExpression":
Expand Down
4 changes: 2 additions & 2 deletions interpreter/interpreter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -883,8 +883,8 @@ func TestStack(t *testing.T) {
FunctionName: "window",
Location: ast.SourceLocation{
File: "universe/universe.flux",
Start: ast.Position{Line: 3803, Column: 12},
End: ast.Position{Line: 3803, Column: 51},
Start: ast.Position{Line: 3814, Column: 12},
End: ast.Position{Line: 3814, Column: 51},
Source: `window(every: inf, timeColumn: timeDst)`,
},
},
Expand Down
2 changes: 1 addition & 1 deletion libflux/go/libflux/buildinfo.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,7 @@ var sourceHashes = map[string]string{
"stdlib/universe/union_heterogeneous_test.flux": "7d8b47b3e96b859a5fed5985c051e2a3fdc947d3d6ff9cc104e40821581fb0cb",
"stdlib/universe/union_test.flux": "f008260d48db70212ce64d3f51f4cf031532a9a67c1ba43242dbc4d43ef31293",
"stdlib/universe/unique_test.flux": "4341d11d277edb94ab41dc98861ff9a97e34b53831e6b4aaeeac9ad26a1e707b",
"stdlib/universe/universe.flux": "022e5b6573225e8143330e8c9210100d1307bcb83337eb05f927bbfc02edc3b6",
"stdlib/universe/universe.flux": "fbd2d1feb3d52cad0b9fa73a5e202ff515f28643dc24ccacc36007274244279d",
"stdlib/universe/universe_truncateTimeColumn_test.flux": "8acb700c612e9eba87c0525b33fd1f0528e6139cc912ed844932caef25d37b56",
"stdlib/universe/window_aggregate_test.flux": "c8f66f7ee188bb2e979e5a8b526057b653922197ae441658f7c7f11251c96576",
"stdlib/universe/window_default_start_align_test.flux": "0aaf612796fbb5ac421579151ad32a8861f4494a314ea615d0ccedd18067b980",
Expand Down
110 changes: 78 additions & 32 deletions stdlib/universe/typeconv.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ import (
"time"
"unicode/utf8"

"github.com/influxdata/flux/array"
"github.com/influxdata/flux/codes"
"github.com/influxdata/flux/internal/errors"
"github.com/influxdata/flux/internal/parser"
"github.com/influxdata/flux/memory"
"github.com/influxdata/flux/runtime"
"github.com/influxdata/flux/semantic"
"github.com/influxdata/flux/values"
Expand All @@ -24,17 +26,19 @@ func init() {
runtime.RegisterPackageValue("universe", "time", timeConv)
runtime.RegisterPackageValue("universe", "duration", durationConv)
runtime.RegisterPackageValue("universe", "bytes", byteConv)
runtime.RegisterPackageValue("universe", "_vectorizedFloat", vectorizedFloatConv)
}

var (
convBoolType = runtime.MustLookupBuiltinType("universe", "bool")
convIntType = runtime.MustLookupBuiltinType("universe", "int")
convUintType = runtime.MustLookupBuiltinType("universe", "uint")
convFloatType = runtime.MustLookupBuiltinType("universe", "float")
convStringType = runtime.MustLookupBuiltinType("universe", "string")
convTimeType = runtime.MustLookupBuiltinType("universe", "time")
convDurationType = runtime.MustLookupBuiltinType("universe", "duration")
convBytesType = runtime.MustLookupBuiltinType("universe", "bytes")
convBoolType = runtime.MustLookupBuiltinType("universe", "bool")
convIntType = runtime.MustLookupBuiltinType("universe", "int")
convUintType = runtime.MustLookupBuiltinType("universe", "uint")
convFloatType = runtime.MustLookupBuiltinType("universe", "float")
convStringType = runtime.MustLookupBuiltinType("universe", "string")
convTimeType = runtime.MustLookupBuiltinType("universe", "time")
convDurationType = runtime.MustLookupBuiltinType("universe", "duration")
convBytesType = runtime.MustLookupBuiltinType("universe", "bytes")
convVectorFloatType = runtime.MustLookupBuiltinType("universe", "_vectorizedFloat")
)

const (
Expand Down Expand Up @@ -174,40 +178,46 @@ var uintConv = values.NewFunction(
false,
)

func toFloatValue(v values.Value) (values.Value, error) {
var float float64
switch v.Type().Nature() {
case semantic.String:
n, err := strconv.ParseFloat(v.Str(), 64)
if err != nil {
return nil, errors.Newf(codes.Invalid, "cannot convert string %q to float due to invalid syntax", v.Str())
}
float = n
case semantic.Int:
float = float64(v.Int())
case semantic.UInt:
float = float64(v.UInt())
case semantic.Float:
float = v.Float()
case semantic.Bool:
if v.Bool() {
float = 1
} else {
float = 0
}
default:
return nil, errors.Newf(codes.Invalid, "cannot convert %v to float", v.Type())
}

return values.NewFloat(float), nil
}

var floatConv = values.NewFunction(
"float",
convFloatType,
func(ctx context.Context, args values.Object) (values.Value, error) {
var float float64
v, ok := args.Get(conversionArg)
if !ok {
return nil, errMissingArg
} else if v.IsNull() {
return values.Null, nil
}
switch v.Type().Nature() {
case semantic.String:
n, err := strconv.ParseFloat(v.Str(), 64)
if err != nil {
return nil, errors.Newf(codes.Invalid, "cannot convert string %q to float due to invalid syntax", v.Str())
}
float = n
case semantic.Int:
float = float64(v.Int())
case semantic.UInt:
float = float64(v.UInt())
case semantic.Float:
float = v.Float()
case semantic.Bool:
if v.Bool() {
float = 1
} else {
float = 0
}
default:
return nil, errors.Newf(codes.Invalid, "cannot convert %v to float", v.Type())
}
return values.NewFloat(float), nil

return toFloatValue(v)
},
false,
)
Expand Down Expand Up @@ -351,3 +361,39 @@ var byteConv = values.NewFunction(
},
false,
)

var vectorizedFloatConv = values.NewFunction(
"_vectorizedFloat",
convVectorFloatType,
func(ctx context.Context, args values.Object) (values.Value, error) {
v, ok := args.Get(conversionArg)
if !ok {
return nil, errMissingArg
}
mem := memory.GetAllocator(ctx)

switch v.Type().Nature() {
case semantic.Vector:
vec := v.Vector()

// Delegate to row-based version when the value is constant
if vr, ok := vec.(*values.VectorRepeatValue); ok {
fv, err := toFloatValue(vr.Value())
if err != nil {
return nil, err
}
return values.NewVectorRepeatValue(fv), nil
}

arr, err := array.ToFloatConv(mem, vec.Arr())
if err != nil {
return nil, err
}
return values.NewFloatVectorValue(arr), nil
default:
return nil, errors.Newf(codes.Invalid, "cannot convert %v to v[float]", v.Type())
}

},
false,
)
Loading

0 comments on commit acb06ec

Please sign in to comment.