Skip to content

Commit

Permalink
fix(stdlib): vectorized map properly handles shadowed columns
Browse files Browse the repository at this point in the history
The vectorized version of map did not properly handle shadowed columns
the way the row based one worked. This copies over some of that code so
that it follows the correct logic. It also copies over the same logic
that is used to determine the column type when type inference does not
properly tell us the type.

This also fixes a bug in building objects that caused a bug in the
compiler. When a `with` is evaluated, the attributes from the original
record are copied over first and then overwritten values are assigned.
This overwriting did not properly release the values that were being
overwritten resulting in a memory leak.
  • Loading branch information
jsternberg committed Jun 2, 2022
1 parent 93cfab8 commit 9f8ab3a
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 71 deletions.
34 changes: 3 additions & 31 deletions execute/vector_fn.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"

"github.com/influxdata/flux"
"github.com/influxdata/flux/array"
"github.com/influxdata/flux/codes"
"github.com/influxdata/flux/compiler"
"github.com/influxdata/flux/execute/table"
Expand Down Expand Up @@ -47,45 +46,18 @@ type vectorFn struct {
preparedFn
}

func (f *vectorFn) Eval(ctx context.Context, chunk table.Chunk) ([]array.Array, error) {
func (f *vectorFn) Eval(ctx context.Context, chunk table.Chunk) (values.Object, error) {
for j, col := range chunk.Cols() {
arr := chunk.Values(j)
arr.Retain()
v := values.NewVectorValue(arr, flux.SemanticType(col.Type))
f.arg0.Set(col.Label, v)
}
defer f.arg0.Release()

res, err := f.fn.Eval(ctx, f.args)
if err != nil {
return nil, err
}

// Map the return object to the expected order from type inference.
// The compiler should have done this by itself, but it doesn't at the moment.
// When the compiler gets refactored so it returns records in the same order
// as type inference, we can remove this and just do a copy by index.
retType := f.returnType()
n, err := retType.NumProperties()
if err != nil {
return nil, err
}

vs := make([]array.Array, n)
for i := 0; i < n; i++ {
prop, err := retType.RecordProperty(i)
if err != nil {
return nil, err
}

vec, ok := res.Object().Get(prop.Name())
if !ok || vec.IsNull() {
// Property does not exist because it is null.
continue
}
vs[i] = vec.(values.Vector).Arr()
vs[i].Retain()
}
res.Release()
f.arg0.Release()
return vs, nil
return res.Object(), nil
}
2 changes: 1 addition & 1 deletion libflux/go/libflux/buildinfo.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ var sourceHashes = map[string]string{
"stdlib/universe/lowestAverage_test.flux": "3debee2f9c626a637b8842cf23d32f88ba1bf2bf98b53744e7c699a0c8f833cf",
"stdlib/universe/lowestCurrent_test.flux": "f9575bdb7e2ee3a37153b296d9e59fa69334e675f2de4f4023c0e7aec541aba2",
"stdlib/universe/lowestMin_test.flux": "389fa9ceb38d066ad18a8b21220d4b554d8c31d83d80f1694bc703a14f86a0b2",
"stdlib/universe/map_test.flux": "522e8caeb0a829481bf116c7c8c098866dc6bca571120e679ef89fce066557c2",
"stdlib/universe/map_test.flux": "a759c1e788d3f0ab7d45b17dad5f39d6faada53e4d1fdb0c2a193788f4bd9437",
"stdlib/universe/math_fn_constant_test.flux": "24091df7a982240ac5fdc4f6495d32a4a6078af1206d6d6cf51d2feb35abb7f7",
"stdlib/universe/math_m_max_test.flux": "9cc0aacc9a66209827b6ba83922fd6a291aa88205d86c5fa79fdce78be7599e0",
"stdlib/universe/max_test.flux": "3057f4d30f8c404af13b2f27ce13527c97ea77bf198c0f1a627f3b48673d4db8",
Expand Down
125 changes: 86 additions & 39 deletions stdlib/universe/map2.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,65 +400,112 @@ type mapVectorPreparedFunc struct {
fn *execute.VectorMapPreparedFn
}

func (m *mapVectorPreparedFunc) Eval(ctx context.Context, chunk table.Chunk, mem memory.Allocator) ([]flux.ColMeta, []array.Array, error) {
ret := m.fn.Type()
n, err := ret.NumProperties()
if err != nil {
return nil, nil, err
}
func (m *mapVectorPreparedFunc) createSchema(record values.Object) ([]flux.ColMeta, error) {
returnType := m.fn.Type()

arr, err := m.fn.Eval(ctx, chunk)
numProps, err := returnType.NumProperties()
if err != nil {
return nil, nil, err
return nil, err
}

nulls := 0
cols := make([]flux.ColMeta, n)
for i := range cols {
// This array was null so we will filter it out later.
if arr[i] == nil {
nulls++
continue
props := make(map[string]semantic.Nature, numProps)
// Deduplicate the properties in the return type.
// Scan properties in reverse order to ensure we only
// add visible properties to the list.
for i := numProps - 1; i >= 0; i-- {
prop, err := returnType.RecordProperty(i)
if err != nil {
return nil, err
}

prop, err := ret.RecordProperty(i)
typ, err := prop.TypeOf()
if err != nil {
return nil, nil, err
return nil, err
}
elemTyp, err := typ.ElemType()
if err != nil {
return nil, err
}
props[prop.Name()] = elemTyp.Nature()
}

typ, err := prop.TypeOf()
// Add columns from function in sorted order.
n, err := record.Type().NumProperties()
if err != nil {
return nil, err
}

keys := make([]string, 0, n)
for i := 0; i < n; i++ {
prop, err := record.Type().RecordProperty(i)
if err != nil {
return nil, nil, err
return nil, err
}
keys = append(keys, prop.Name())
}

if typ.Nature() != semantic.Vector {
return nil, nil, errors.Newf(codes.Internal, "column %s is not a vector", prop.Name())
cols := make([]flux.ColMeta, 0, len(keys))
for _, k := range keys {
v, ok := record.Get(k)
if !ok {
continue
}

elem, err := typ.ElemType()
if err != nil {
return nil, nil, err
nature := semantic.Invalid
if !v.IsNull() {
elemType, err := v.Type().ElemType()
if err != nil {
return nil, err
}
nature = elemType.Nature()
}

cols[i] = flux.ColMeta{
Label: prop.Name(),
Type: flux.ColumnType(elem),
if kind, ok := props[k]; ok && kind != semantic.Invalid {
nature = kind
}
if cols[i].Type == flux.TInvalid {
return nil, nil, errors.Newf(codes.FailedPrecondition, "column %s is not a basic type, is of type %s", prop.Name(), elem)
if nature == semantic.Invalid {
continue
}
ty := execute.ConvertFromKind(nature)
if ty == flux.TInvalid {
return nil, errors.Newf(codes.Invalid, `map object property "%s" is %v type which is not supported in a flux table`, k, nature)
}
cols = append(cols, flux.ColMeta{
Label: k,
Type: ty,
})
}
return cols, nil
}

func (m *mapVectorPreparedFunc) Eval(ctx context.Context, chunk table.Chunk, mem memory.Allocator) ([]flux.ColMeta, []array.Array, error) {
res, err := m.fn.Eval(ctx, chunk)
if err != nil {
return nil, nil, err
}
defer res.Release()

cols, err := m.createSchema(res)
if err != nil {
return nil, nil, err
}

if nulls > 0 {
newArrs := make([]array.Array, 0, len(arr)-nulls)
newCols := make([]flux.ColMeta, 0, cap(newArrs))
for i := range arr {
if arr[i] != nil {
newArrs = append(newArrs, arr[i])
newCols = append(newCols, cols[i])
var n int
arrs := make([]array.Array, len(cols))
for i, col := range cols {
if v, ok := res.Get(col.Label); ok && !v.IsNull() {
arr := v.Vector().Arr()
arr.Retain()
if n == 0 {
n = arr.Len()
}
arrs[i] = arr
}
cols, arr = newCols, newArrs
}
return cols, arr, nil

for i, col := range cols {
if arrs[i] == nil {
arrs[i] = arrow.Nulls(col.Type, n, mem)
}
}
return cols, arrs, nil
}
10 changes: 10 additions & 0 deletions stdlib/universe/map_test.flux
Original file line number Diff line number Diff line change
Expand Up @@ -519,3 +519,13 @@ testcase vectorize_or_operator {

testing.diff(want: want, got: got) |> yield()
}

testcase vectorize_with_operator_overwrite_attribute {
got = array.from(rows: [{ x: 1, y: "a" }])
|> map(fn: (r) => ({ r with x: r.x }))
|> drop(columns: ["y"])

want = array.from(rows: [{ x: 1 }])

testing.diff(got, want)
}
4 changes: 4 additions & 0 deletions values/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ func BuildObjectWithSize(sz int, fn func(set ObjectSetter) error) (Object, error
if err := fn(func(k string, v Value) {
for i := 0; i < len(keys); i++ {
if keys[i] == k {
if values[i] != nil {
// Value overwritten. Usually due to a shadowed field.
values[i].Release()
}
values[i] = v
return
}
Expand Down

0 comments on commit 9f8ab3a

Please sign in to comment.