Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Join function does not properly handle divergent schemas being j… #4560

Merged
merged 1 commit into from
Mar 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion libflux/go/libflux/buildinfo.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ var sourceHashes = map[string]string{
"stdlib/universe/integral_test.flux": "a2b3d16ad20068af8ac7b960b152beb5d4a0319a2b07210467c8cfcdd55ed559",
"stdlib/universe/join_across_measurements_test.flux": "61d69faa82616287ac875e8878e9c8095a46d638e0cecdc30f9db26763ca1eec",
"stdlib/universe/join_agg_test.flux": "34b8eb508967c31d52dbe4a225d9b48c5ec1177e8021dc98267c5956f285ffdf",
"stdlib/universe/join_mismatched_schema_test.flux": "152ebc1fe1f3bf65ab84cf60582ac5b4618538a4c70eb729ab52a66183c58dbf",
"stdlib/universe/join_mismatched_schema_test.flux": "ec93ea26357783c15da7ac9be3b9b60d01122ad5fd482b802d5d9a6e7b3e5fb2",
"stdlib/universe/join_missing_on_col_test.flux": "3d15e1bb1186ffbf39d68fb2698eca0812608a77be3592d5559fc2a3f5c0ae4a",
"stdlib/universe/join_panic_test.flux": "d974919167b6bf8c0ce323f23f09aebccdad7ba4fbe1fd3cc1cd90402d9af28f",
"stdlib/universe/join_test.flux": "7265656eb22c659f97e4ae15a4b6538f9081c2aef344a0131bcee09bdc6a61f8",
Expand Down
57 changes: 5 additions & 52 deletions stdlib/universe/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ func (t *mergeJoinTransformation) Process(id execute.DatasetID, tbl flux.Table)

// Check if enough data sources have been seen to produce an output schema
if !t.cache.isBufferEmpty(t.leftID) && !t.cache.isBufferEmpty(t.rightID) && !t.cache.postJoinSchemaBuilt() {
t.cache.buildPostJoinSchema()
t.cache.buildPostJoinSchema(t.cache.schemas[t.cache.leftID].columns, t.cache.schemas[t.cache.rightID].columns)
}

// Register any new output group keys that can be constructed from the new table
Expand Down Expand Up @@ -799,9 +799,7 @@ func (c *MergeJoinCache) postJoinSchemaBuilt() bool {
return c.schemaMap != nil
}

func (c *MergeJoinCache) buildPostJoinSchema() {
left := c.schemas[c.leftID].columns
right := c.schemas[c.rightID].columns
func (c *MergeJoinCache) buildPostJoinSchema(left, right []flux.ColMeta) {

// Find column names shared between the two tables
shared := make(map[string]bool, len(left))
Expand Down Expand Up @@ -837,46 +835,6 @@ func (c *MergeJoinCache) buildPostJoinSchema() {
}
}

// Find indexes of column names missing between the two tables
func (c *MergeJoinCache) missingColIdx(lCols, rCols []flux.ColMeta) []int {

var idxs []int

if len(lCols) == 0 || len(rCols) == 0 {
return idxs
}

trackIdx := make(map[int]struct{})

for _, lCol := range lCols {
column := tableCol{
table: c.names[c.leftID],
col: lCol.Label,
}
newColumn := c.schemaMap[column]
newColumnIdx := c.colIndex[newColumn]
trackIdx[newColumnIdx] = struct{}{}
}

for _, rCol := range rCols {
column := tableCol{
table: c.names[c.rightID],
col: rCol.Label,
}
newColumn := c.schemaMap[column]
newColumnIdx := c.colIndex[newColumn]
trackIdx[newColumnIdx] = struct{}{}
}

for _, idx := range c.colIndex {
if _, ok := trackIdx[idx]; !ok {
idxs = append(idxs, idx)
}
}

return idxs
}

func (c *MergeJoinCache) join(left, right *execute.ColListTableBuilder) (flux.Table, error) {
// Sort input tables
left.Sort(c.order, false)
Expand All @@ -888,6 +846,9 @@ func (c *MergeJoinCache) join(left, right *execute.ColListTableBuilder) (flux.Ta
leftSet, leftKey = c.advance(leftSet.Stop, left)
rightSet, rightKey = c.advance(rightSet.Stop, right)

// Build the output table, this will deal with the cases where tables in stream have different schemas
c.buildPostJoinSchema(left.Cols(), right.Cols())

keys := map[execute.DatasetID]flux.GroupKey{
c.leftID: left.Key(),
c.rightID: right.Key(),
Expand All @@ -904,9 +865,6 @@ func (c *MergeJoinCache) join(left, right *execute.ColListTableBuilder) (flux.Ta
}
}

// find missing coloumn indexes between two tables
missingIdxs := c.missingColIdx(left.Cols(), right.Cols())

// Perform sort merge join
for !leftSet.Empty() && !rightSet.Empty() {
if leftKey.EqualTrueNulls(rightKey) {
Expand Down Expand Up @@ -963,11 +921,6 @@ func (c *MergeJoinCache) join(left, right *execute.ColListTableBuilder) (flux.Ta
if err != nil {
return nil, err
}

// append nil for the missing column in the result table
for _, mIdx := range missingIdxs {
_ = builder.AppendNil(mIdx)
}
}
}
leftSet, leftKey = c.advance(leftSet.Stop, left)
Expand Down
244 changes: 239 additions & 5 deletions stdlib/universe/join_mismatched_schema_test.flux
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package universe_test


import "csv"
import "array"
import "testing"
import "internal/debug"

Expand Down Expand Up @@ -38,7 +39,8 @@ b =
",
)

testcase normal {
// left stream's, second table is missing 'key' columns
testcase missing_column_on_left_stream {
got =
join(tables: {a, b}, on: ["_time"])
|> debug.slurp()
Expand All @@ -54,14 +56,246 @@ testcase normal {
,,0,2021-01-01T00:00:00Z,1.0,10.0,foo,
,,0,2021-01-01T00:01:00Z,2.0,20.0,foo,

#datatype,string,long,dateTime:RFC3339,double,double,string
#group,false,false,false,false,false,true
#default,_result,,,,,
,result,table,_time,_value_a,_value_b,key
,,1,2021-01-01T00:00:00Z,1.5,10.0,
,,1,2021-01-01T00:01:00Z,2.5,20.0,
",
)

testing.diff(got, want) |> yield()
}

a1 =
csv.from(
csv:
"
#datatype,string,long,dateTime:RFC3339,double
#group,false,false,false,false
#default,_result,,,
,result,table,_time,_value
,,0,2021-01-01T00:00:00Z,1.5
,,0,2021-01-01T00:01:00Z,2.5
#datatype,string,long,dateTime:RFC3339,double,string
#group,false,false,false,false,true
#default,_result,,,,
,result,table,_time,_value,key
,,1,2021-01-01T00:00:00Z,1.0,foo
,,1,2021-01-01T00:01:00Z,2.0,foo
",
)
b1 =
csv.from(
csv:
"
#datatype,string,long,dateTime:RFC3339,double,string
#group,false,false,false,false,true
#default,_result,,,,
,result,table,_time,_value,key
,,0,2021-01-01T00:00:00Z,10.0,bar
,,0,2021-01-01T00:01:00Z,20.0,bar
",
)

// change in the result join schema on the fly as tables in left stream contains different schema
// left stream's, second table has extra column 'key'
testcase missing_column_on_left_stream_with_join_schema_change {
got =
join(tables: {a1, b1}, on: ["_time"])
|> debug.slurp()
want =
csv.from(
csv:
"
#datatype,string,long,dateTime:RFC3339,double,double,string
#group,false,false,false,false,false,true
#default,_result,,,,,
,result,table,_time,_value_a1,_value_b1,key
,,0,2021-01-01T00:00:00Z,1.5,10.0,bar
,,0,2021-01-01T00:01:00Z,2.5,20.0,bar
#datatype,string,long,dateTime:RFC3339,double,double,string,string
#group,false,false,false,false,false,false,true
#group,false,false,false,false,false,true,true
#default,_result,,,,,,
,result,table,_time,_value_a,_value_b,key_a,key_b
,,1,2021-01-01T00:00:00Z,1.5,10.0,,
,,1,2021-01-01T00:01:00Z,2.5,20.0,,
,result,table,_time,_value_a1,_value_b1,key_a1,key_b1
,,1,2021-01-01T00:00:00Z,1.0,10.0,foo,bar
,,1,2021-01-01T00:01:00Z,2.0,20.0,foo,bar
",
)

testing.diff(got, want) |> yield()
}

a2 =
csv.from(
csv:
"
#datatype,string,long,dateTime:RFC3339,double,string
#group,false,false,false,false,true
#default,_result,,,,
,result,table,_time,_value,key
,,0,2021-01-01T00:00:00Z,1.0,foo
,,0,2021-01-01T00:01:00Z,2.0,foo
#datatype,string,long,dateTime:RFC3339,double
#group,false,false,false,false
#default,_result,,,
,result,table,_time,_value
,,1,2021-01-01T00:00:00Z,1.5
,,1,2021-01-01T00:01:00Z,2.5
",
)
b2 =
csv.from(
csv:
"
#datatype,string,long,dateTime:RFC3339,double,double
#group,false,false,false,false,true
#default,_result,,,,
,result,table,_time,_value,key
,,0,2021-01-01T00:00:00Z,10.0,8.0
,,0,2021-01-01T00:01:00Z,20.0,88.0
",
)

// when a column exists on both sides but has a different type
// column 'key' is string on the left stream and double on the right stream
testcase same_column_on_both_stream_with_different_type {
got =
join(tables: {a2, b2}, on: ["_time"])
|> debug.slurp()
want =
csv.from(
csv:
"
#datatype,string,long,dateTime:RFC3339,double,double,string,double
#group,false,false,false,false,false,true,true
#default,_result,,,,,,
,result,table,_time,_value_a2,_value_b2,key_a2,key_b2
,,0,2021-01-01T00:00:00Z,1.0,10.0,foo,8.0
,,0,2021-01-01T00:01:00Z,2.0,20.0,foo,8.0
#datatype,string,long,dateTime:RFC3339,double,double,double
#group,false,false,false,false,false,true
#default,_result,,,,,
,result,table,_time,_value_a2,_value_b2,key
,,1,2021-01-01T00:00:00Z,1.5,10.0,8.0
,,1,2021-01-01T00:01:00Z,2.5,20.0,8.0
",
)

testing.diff(got, want) |> yield()
}

a3 =
csv.from(
csv:
"
#datatype,string,long,dateTime:RFC3339,double,string
#group,false,false,false,false,true
#default,_result,,,,
,result,table,_time,_value,key
,,0,2021-01-01T00:00:00Z,1.0,key0
,,0,2021-01-01T00:01:00Z,1.5,key0
#datatype,string,long,dateTime:RFC3339,double,string,string
#group,false,false,false,false,true,true
#default,_result,,,,,
,result,table,_time,_value,key,gkey_1
,,1,2021-01-01T00:00:00Z,2.0,key1,gkey1
,,1,2021-01-01T00:01:00Z,2.5,key1,gkey1
#datatype,string,long,dateTime:RFC3339,double,string,string
#group,false,false,false,false,true,true
#default,_result,,,,,
,result,table,_time,_value,key,gkey_2
,,2,2021-01-01T00:00:00Z,3.0,key2,gkey2
,,2,2021-01-01T00:01:00Z,3.5,key2,gkey2
",
)
b3 =
csv.from(
csv:
"
#datatype,string,long,dateTime:RFC3339,double,string
#group,false,false,false,false,true
#default,_result,,,,
,result,table,_time,_value,key
,,0,2021-01-01T00:00:00Z,10.0,key0
,,0,2021-01-01T00:01:00Z,10.5,key0
",
)

// the group key is different on left and right stream
// Left Stream - Right Stream -
// 0th table - key 0th table - key
// 1st table - key, gkey_1
// 2nd table - key, gkey_2
// Join on _time (non groupKey)
testcase different_group_key_on_left_and_right_stream_join_on_non_group_key {
got =
join(tables: {a3, b3}, on: ["_time"])
|> debug.slurp()
want =
csv.from(
csv:
"
#datatype,string,long,dateTime:RFC3339,double,double,string,string
#group,false,false,false,false,false,true,true
#default,_result,,,,,,
,result,table,_time,_value_a3,_value_b3,key_a3,key_b3
,,0,2021-01-01T00:00:00Z,1.0,10.0,key0,key0
,,0,2021-01-01T00:01:00Z,1.5,10.5,key0,key0
#datatype,string,long,dateTime:RFC3339,double,double,string,string,string
#group,false,false,false,false,false,true,true,true
#default,_result,,,,,,,
,result,table,_time,_value_a3,_value_b3,key_a3,key_b3,gkey_1
,,1,2021-01-01T00:00:00Z,2.0,10.0,key1,key0,gkey1
,,1,2021-01-01T00:01:00Z,2.5,10.5,key1,key0,gkey1
#datatype,string,long,dateTime:RFC3339,double,double,string,string,string
#group,false,false,false,false,false,true,true,true
#default,_result,,,,,,,
,result,table,_time,_value_a3,_value_b3,key_a3,key_b3,gkey_2
,,2,2021-01-01T00:00:00Z,3.0,10.0,key2,key0,gkey2
,,2,2021-01-01T00:01:00Z,3.5,10.5,key2,key0,gkey2
",
)

testing.diff(got, want) |> yield()
}

s1 =
array.from(rows: [{unit: "A", power: 100}, {unit: "B", power: 200}, {unit: "C", power: 300}])
|> group(columns: ["unit"])
|> debug.opaque()

s2 =
union(
tables: [
array.from(rows: [{columnA: "valueA", unit: "A", group: "groupX"}])
|> group(columns: ["columnA", "unit"])
|> debug.opaque(),
array.from(rows: [{columnB: "valueB", unit: "B", group: "groupX"}])
|> group(columns: ["columnB", "unit"])
|> debug.opaque(),
array.from(rows: [{unit: "C", group: "groupX"}])
|> group(columns: ["unit"])
|> debug.opaque(),
],
)

ra1 = array.from(rows: [{unit: "A", power: 100, group: "groupX", columnA: "valueA"}])
ra2 = array.from(rows: [{unit: "B", power: 200, group: "groupX", columnB: "valueB"}])
ra3 = array.from(rows: [{unit: "C", power: 300, group: "groupX"}])

testcase join_different_table_schemas_in_stream {
want =
union(
tables: [
ra1 |> group(columns: ["columnA", "unit"]) |> debug.opaque(),
ra2 |> group(columns: ["columnB", "unit"]) |> debug.opaque(),
ra3 |> group(columns: ["unit"]) |> debug.opaque(),
],
)

got = join(tables: {s1, s2}, on: ["unit"]) |> debug.opaque()

testing.diff(want: want, got: got) |> yield()
}
Loading