From 7133d44bfaec1894452da314cf39c4d859049ada Mon Sep 17 00:00:00 2001 From: Sunil Kartikey Date: Mon, 21 Mar 2022 13:22:46 +0000 Subject: [PATCH] fix: Join function does not properly handle divergent schemas fixes: #4506 #4315 --- libflux/go/libflux/buildinfo.gen.go | 2 +- stdlib/universe/join.go | 57 +--- .../universe/join_mismatched_schema_test.flux | 244 +++++++++++++++++- stdlib/universe/join_test.go | 42 +-- 4 files changed, 269 insertions(+), 76 deletions(-) diff --git a/libflux/go/libflux/buildinfo.gen.go b/libflux/go/libflux/buildinfo.gen.go index 56b0ec240d..5d28e33afe 100644 --- a/libflux/go/libflux/buildinfo.gen.go +++ b/libflux/go/libflux/buildinfo.gen.go @@ -496,7 +496,7 @@ var sourceHashes = map[string]string{ "stdlib/universe/integral_test.flux": "a2b3d16ad20068af8ac7b960b152beb5d4a0319a2b07210467c8cfcdd55ed559", "stdlib/universe/join_across_measurements_test.flux": "61d69faa82616287ac875e8878e9c8095a46d638e0cecdc30f9db26763ca1eec", "stdlib/universe/join_agg_test.flux": "34b8eb508967c31d52dbe4a225d9b48c5ec1177e8021dc98267c5956f285ffdf", - "stdlib/universe/join_mismatched_schema_test.flux": "152ebc1fe1f3bf65ab84cf60582ac5b4618538a4c70eb729ab52a66183c58dbf", + "stdlib/universe/join_mismatched_schema_test.flux": "ec93ea26357783c15da7ac9be3b9b60d01122ad5fd482b802d5d9a6e7b3e5fb2", "stdlib/universe/join_missing_on_col_test.flux": "3d15e1bb1186ffbf39d68fb2698eca0812608a77be3592d5559fc2a3f5c0ae4a", "stdlib/universe/join_panic_test.flux": "d974919167b6bf8c0ce323f23f09aebccdad7ba4fbe1fd3cc1cd90402d9af28f", "stdlib/universe/join_test.flux": "7265656eb22c659f97e4ae15a4b6538f9081c2aef344a0131bcee09bdc6a61f8", diff --git a/stdlib/universe/join.go b/stdlib/universe/join.go index 0e7ba99f16..24c4534f62 100644 --- a/stdlib/universe/join.go +++ b/stdlib/universe/join.go @@ -297,7 +297,7 @@ func (t *mergeJoinTransformation) Process(id execute.DatasetID, tbl flux.Table) // Check if enough data sources have been seen to produce an output schema if !t.cache.isBufferEmpty(t.leftID) && !t.cache.isBufferEmpty(t.rightID) && !t.cache.postJoinSchemaBuilt() { - t.cache.buildPostJoinSchema() + t.cache.buildPostJoinSchema(t.cache.schemas[t.cache.leftID].columns, t.cache.schemas[t.cache.rightID].columns) } // Register any new output group keys that can be constructed from the new table @@ -799,9 +799,7 @@ func (c *MergeJoinCache) postJoinSchemaBuilt() bool { return c.schemaMap != nil } -func (c *MergeJoinCache) buildPostJoinSchema() { - left := c.schemas[c.leftID].columns - right := c.schemas[c.rightID].columns +func (c *MergeJoinCache) buildPostJoinSchema(left, right []flux.ColMeta) { // Find column names shared between the two tables shared := make(map[string]bool, len(left)) @@ -837,46 +835,6 @@ func (c *MergeJoinCache) buildPostJoinSchema() { } } -// Find indexes of column names missing between the two tables -func (c *MergeJoinCache) missingColIdx(lCols, rCols []flux.ColMeta) []int { - - var idxs []int - - if len(lCols) == 0 || len(rCols) == 0 { - return idxs - } - - trackIdx := make(map[int]struct{}) - - for _, lCol := range lCols { - column := tableCol{ - table: c.names[c.leftID], - col: lCol.Label, - } - newColumn := c.schemaMap[column] - newColumnIdx := c.colIndex[newColumn] - trackIdx[newColumnIdx] = struct{}{} - } - - for _, rCol := range rCols { - column := tableCol{ - table: c.names[c.rightID], - col: rCol.Label, - } - newColumn := c.schemaMap[column] - newColumnIdx := c.colIndex[newColumn] - trackIdx[newColumnIdx] = struct{}{} - } - - for _, idx := range c.colIndex { - if _, ok := trackIdx[idx]; !ok { - idxs = append(idxs, idx) - } - } - - return idxs -} - func (c *MergeJoinCache) join(left, right *execute.ColListTableBuilder) (flux.Table, error) { // Sort input tables left.Sort(c.order, false) @@ -888,6 +846,9 @@ func (c *MergeJoinCache) join(left, right *execute.ColListTableBuilder) (flux.Ta leftSet, leftKey = c.advance(leftSet.Stop, left) rightSet, rightKey = c.advance(rightSet.Stop, right) + // Build the output table, this will deal with the cases where tables in stream have different schemas + c.buildPostJoinSchema(left.Cols(), right.Cols()) + keys := map[execute.DatasetID]flux.GroupKey{ c.leftID: left.Key(), c.rightID: right.Key(), @@ -904,9 +865,6 @@ func (c *MergeJoinCache) join(left, right *execute.ColListTableBuilder) (flux.Ta } } - // find missing coloumn indexes between two tables - missingIdxs := c.missingColIdx(left.Cols(), right.Cols()) - // Perform sort merge join for !leftSet.Empty() && !rightSet.Empty() { if leftKey.EqualTrueNulls(rightKey) { @@ -963,11 +921,6 @@ func (c *MergeJoinCache) join(left, right *execute.ColListTableBuilder) (flux.Ta if err != nil { return nil, err } - - // append nil for the missing column in the result table - for _, mIdx := range missingIdxs { - _ = builder.AppendNil(mIdx) - } } } leftSet, leftKey = c.advance(leftSet.Stop, left) diff --git a/stdlib/universe/join_mismatched_schema_test.flux b/stdlib/universe/join_mismatched_schema_test.flux index b336f13618..7bf6100602 100644 --- a/stdlib/universe/join_mismatched_schema_test.flux +++ b/stdlib/universe/join_mismatched_schema_test.flux @@ -2,6 +2,7 @@ package universe_test import "csv" +import "array" import "testing" import "internal/debug" @@ -38,7 +39,8 @@ b = ", ) -testcase normal { +// left stream's, second table is missing 'key' columns +testcase missing_column_on_left_stream { got = join(tables: {a, b}, on: ["_time"]) |> debug.slurp() @@ -54,14 +56,246 @@ testcase normal { ,,0,2021-01-01T00:00:00Z,1.0,10.0,foo, ,,0,2021-01-01T00:01:00Z,2.0,20.0,foo, +#datatype,string,long,dateTime:RFC3339,double,double,string +#group,false,false,false,false,false,true +#default,_result,,,,, +,result,table,_time,_value_a,_value_b,key +,,1,2021-01-01T00:00:00Z,1.5,10.0, +,,1,2021-01-01T00:01:00Z,2.5,20.0, +", + ) + + testing.diff(got, want) |> yield() + } + +a1 = + csv.from( + csv: + " +#datatype,string,long,dateTime:RFC3339,double +#group,false,false,false,false +#default,_result,,, +,result,table,_time,_value +,,0,2021-01-01T00:00:00Z,1.5 +,,0,2021-01-01T00:01:00Z,2.5 +#datatype,string,long,dateTime:RFC3339,double,string +#group,false,false,false,false,true +#default,_result,,,, +,result,table,_time,_value,key +,,1,2021-01-01T00:00:00Z,1.0,foo +,,1,2021-01-01T00:01:00Z,2.0,foo +", + ) +b1 = + csv.from( + csv: + " +#datatype,string,long,dateTime:RFC3339,double,string +#group,false,false,false,false,true +#default,_result,,,, +,result,table,_time,_value,key +,,0,2021-01-01T00:00:00Z,10.0,bar +,,0,2021-01-01T00:01:00Z,20.0,bar +", + ) + +// change in the result join schema on the fly as tables in left stream contains different schema +// left stream's, second table has extra column 'key' +testcase missing_column_on_left_stream_with_join_schema_change { + got = + join(tables: {a1, b1}, on: ["_time"]) + |> debug.slurp() + want = + csv.from( + csv: + " +#datatype,string,long,dateTime:RFC3339,double,double,string +#group,false,false,false,false,false,true +#default,_result,,,,, +,result,table,_time,_value_a1,_value_b1,key +,,0,2021-01-01T00:00:00Z,1.5,10.0,bar +,,0,2021-01-01T00:01:00Z,2.5,20.0,bar #datatype,string,long,dateTime:RFC3339,double,double,string,string -#group,false,false,false,false,false,false,true +#group,false,false,false,false,false,true,true #default,_result,,,,,, -,result,table,_time,_value_a,_value_b,key_a,key_b -,,1,2021-01-01T00:00:00Z,1.5,10.0,, -,,1,2021-01-01T00:01:00Z,2.5,20.0,, +,result,table,_time,_value_a1,_value_b1,key_a1,key_b1 +,,1,2021-01-01T00:00:00Z,1.0,10.0,foo,bar +,,1,2021-01-01T00:01:00Z,2.0,20.0,foo,bar +", + ) + + testing.diff(got, want) |> yield() + } + +a2 = + csv.from( + csv: + " +#datatype,string,long,dateTime:RFC3339,double,string +#group,false,false,false,false,true +#default,_result,,,, +,result,table,_time,_value,key +,,0,2021-01-01T00:00:00Z,1.0,foo +,,0,2021-01-01T00:01:00Z,2.0,foo +#datatype,string,long,dateTime:RFC3339,double +#group,false,false,false,false +#default,_result,,, +,result,table,_time,_value +,,1,2021-01-01T00:00:00Z,1.5 +,,1,2021-01-01T00:01:00Z,2.5 +", + ) +b2 = + csv.from( + csv: + " +#datatype,string,long,dateTime:RFC3339,double,double +#group,false,false,false,false,true +#default,_result,,,, +,result,table,_time,_value,key +,,0,2021-01-01T00:00:00Z,10.0,8.0 +,,0,2021-01-01T00:01:00Z,20.0,88.0 +", + ) + +// when a column exists on both sides but has a different type +// column 'key' is string on the left stream and double on the right stream +testcase same_column_on_both_stream_with_different_type { + got = + join(tables: {a2, b2}, on: ["_time"]) + |> debug.slurp() + want = + csv.from( + csv: + " +#datatype,string,long,dateTime:RFC3339,double,double,string,double +#group,false,false,false,false,false,true,true +#default,_result,,,,,, +,result,table,_time,_value_a2,_value_b2,key_a2,key_b2 +,,0,2021-01-01T00:00:00Z,1.0,10.0,foo,8.0 +,,0,2021-01-01T00:01:00Z,2.0,20.0,foo,8.0 +#datatype,string,long,dateTime:RFC3339,double,double,double +#group,false,false,false,false,false,true +#default,_result,,,,, +,result,table,_time,_value_a2,_value_b2,key +,,1,2021-01-01T00:00:00Z,1.5,10.0,8.0 +,,1,2021-01-01T00:01:00Z,2.5,20.0,8.0 ", ) testing.diff(got, want) |> yield() } + +a3 = + csv.from( + csv: + " +#datatype,string,long,dateTime:RFC3339,double,string +#group,false,false,false,false,true +#default,_result,,,, +,result,table,_time,_value,key +,,0,2021-01-01T00:00:00Z,1.0,key0 +,,0,2021-01-01T00:01:00Z,1.5,key0 +#datatype,string,long,dateTime:RFC3339,double,string,string +#group,false,false,false,false,true,true +#default,_result,,,,, +,result,table,_time,_value,key,gkey_1 +,,1,2021-01-01T00:00:00Z,2.0,key1,gkey1 +,,1,2021-01-01T00:01:00Z,2.5,key1,gkey1 +#datatype,string,long,dateTime:RFC3339,double,string,string +#group,false,false,false,false,true,true +#default,_result,,,,, +,result,table,_time,_value,key,gkey_2 +,,2,2021-01-01T00:00:00Z,3.0,key2,gkey2 +,,2,2021-01-01T00:01:00Z,3.5,key2,gkey2 +", + ) +b3 = + csv.from( + csv: + " +#datatype,string,long,dateTime:RFC3339,double,string +#group,false,false,false,false,true +#default,_result,,,, +,result,table,_time,_value,key +,,0,2021-01-01T00:00:00Z,10.0,key0 +,,0,2021-01-01T00:01:00Z,10.5,key0 +", + ) + +// the group key is different on left and right stream +// Left Stream - Right Stream - +// 0th table - key 0th table - key +// 1st table - key, gkey_1 +// 2nd table - key, gkey_2 +// Join on _time (non groupKey) +testcase different_group_key_on_left_and_right_stream_join_on_non_group_key { + got = + join(tables: {a3, b3}, on: ["_time"]) + |> debug.slurp() + want = + csv.from( + csv: + " +#datatype,string,long,dateTime:RFC3339,double,double,string,string +#group,false,false,false,false,false,true,true +#default,_result,,,,,, +,result,table,_time,_value_a3,_value_b3,key_a3,key_b3 +,,0,2021-01-01T00:00:00Z,1.0,10.0,key0,key0 +,,0,2021-01-01T00:01:00Z,1.5,10.5,key0,key0 +#datatype,string,long,dateTime:RFC3339,double,double,string,string,string +#group,false,false,false,false,false,true,true,true +#default,_result,,,,,,, +,result,table,_time,_value_a3,_value_b3,key_a3,key_b3,gkey_1 +,,1,2021-01-01T00:00:00Z,2.0,10.0,key1,key0,gkey1 +,,1,2021-01-01T00:01:00Z,2.5,10.5,key1,key0,gkey1 +#datatype,string,long,dateTime:RFC3339,double,double,string,string,string +#group,false,false,false,false,false,true,true,true +#default,_result,,,,,,, +,result,table,_time,_value_a3,_value_b3,key_a3,key_b3,gkey_2 +,,2,2021-01-01T00:00:00Z,3.0,10.0,key2,key0,gkey2 +,,2,2021-01-01T00:01:00Z,3.5,10.5,key2,key0,gkey2 +", + ) + + testing.diff(got, want) |> yield() + } + +s1 = + array.from(rows: [{unit: "A", power: 100}, {unit: "B", power: 200}, {unit: "C", power: 300}]) + |> group(columns: ["unit"]) + |> debug.opaque() + +s2 = + union( + tables: [ + array.from(rows: [{columnA: "valueA", unit: "A", group: "groupX"}]) + |> group(columns: ["columnA", "unit"]) + |> debug.opaque(), + array.from(rows: [{columnB: "valueB", unit: "B", group: "groupX"}]) + |> group(columns: ["columnB", "unit"]) + |> debug.opaque(), + array.from(rows: [{unit: "C", group: "groupX"}]) + |> group(columns: ["unit"]) + |> debug.opaque(), + ], + ) + +ra1 = array.from(rows: [{unit: "A", power: 100, group: "groupX", columnA: "valueA"}]) +ra2 = array.from(rows: [{unit: "B", power: 200, group: "groupX", columnB: "valueB"}]) +ra3 = array.from(rows: [{unit: "C", power: 300, group: "groupX"}]) + +testcase join_different_table_schemas_in_stream { + want = + union( + tables: [ + ra1 |> group(columns: ["columnA", "unit"]) |> debug.opaque(), + ra2 |> group(columns: ["columnB", "unit"]) |> debug.opaque(), + ra3 |> group(columns: ["unit"]) |> debug.opaque(), + ], + ) + + got = join(tables: {s1, s2}, on: ["unit"]) |> debug.opaque() + + testing.diff(want: want, got: got) |> yield() +} diff --git a/stdlib/universe/join_test.go b/stdlib/universe/join_test.go index 12593c5365..1a642b0c78 100644 --- a/stdlib/universe/join_test.go +++ b/stdlib/universe/join_test.go @@ -786,13 +786,12 @@ func TestMergeJoin_Process(t *testing.T) { {Label: "_time", Type: flux.TTime}, {Label: "_value_a", Type: flux.TFloat}, {Label: "_value_b", Type: flux.TFloat}, - {Label: "key_a", Type: flux.TString}, - {Label: "key_b", Type: flux.TString}, + {Label: "key", Type: flux.TString}, }, - KeyCols: []string{"key_b"}, + KeyCols: []string{"key"}, Data: [][]interface{}{ - {execute.Time(1), 1.5, 10.0, nil, "bar"}, - {execute.Time(2), 2.5, 20.0, nil, "bar"}, + {execute.Time(1), 1.5, 10.0, "bar"}, + {execute.Time(2), 2.5, 20.0, "bar"}, }, }, }, @@ -862,13 +861,12 @@ func TestMergeJoin_Process(t *testing.T) { {Label: "_time", Type: flux.TTime}, {Label: "_value_a", Type: flux.TFloat}, {Label: "_value_b", Type: flux.TFloat}, - {Label: "key_a", Type: flux.TString}, - {Label: "key_b", Type: flux.TString}, + {Label: "key", Type: flux.TString}, }, - KeyCols: []string{"key_b"}, + KeyCols: []string{"key"}, Data: [][]interface{}{ - {execute.Time(1), 1.5, 10.0, nil, nil}, - {execute.Time(2), 2.5, 20.0, nil, nil}, + {execute.Time(1), 1.5, 10.0, nil}, + {execute.Time(2), 2.5, 20.0, nil}, }, }, }, @@ -1651,13 +1649,6 @@ func TestMergeJoin_Process(t *testing.T) { }, }, { - // Give one table in data0 an extra column. - // When join tries to look up that column name in the column index map, - // it will get a value of 0. - // - // Prior to #4310, this would cause the join transformation to try to - // append whatever value was in the extra column to the column at index 0. - // If they did not have the same type, join would panic. name: "extra column", spec: &universe.MergeJoinProcedureSpec{ On: []string{"_time", "Alias", "Device", "SerialNumber"}, @@ -1738,8 +1729,23 @@ func TestMergeJoin_Process(t *testing.T) { {"SIM-SAM-M169", int64(1), "12345", execute.Time(1), 8.4, 8.4, 1.2}, }, }, + { + KeyCols: []string{"Alias", "Device", "SerialNumber", "_time"}, + ColMeta: []flux.ColMeta{ + {Label: "Alias", Type: flux.TString}, + {Label: "Device", Type: flux.TInt}, + {Label: "SerialNumber", Type: flux.TString}, + {Label: "_time", Type: flux.TTime}, + {Label: "Gauge", Type: flux.TFloat}, + {Label: "Pitch_a", Type: flux.TFloat}, + {Label: "Pitch_b", Type: flux.TFloat}, + {Label: "Angle", Type: flux.TFloat}, + }, + Data: [][]interface{}{ + {"SIM-SAM-M169", int64(2), "13579", execute.Time(1), 9.3, 9.3, 9.3, 5.6}, + }, + }, }, - wantErr: errors.New("column 'Gauge' not found in join schema"), }, } for _, tc := range testCases {