Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Join function does not properly handle divergent schemas being j… #4560

Merged
merged 1 commit into from
Mar 21, 2022

Conversation

skartikey
Copy link
Contributor

@skartikey skartikey commented Mar 15, 2022

…oined together

When one of the input streams passed into join contains tables with different schemas, it causes join to fail. This is because join produces the schema of its final output based on the first table it finds in each of the two input streams it receives. It does this once at the beginning of the transformation. Then, while it's processing tables, if it finds a column that it doesn't recognize as part of the schema, it throws an error.

Ideally, the join transformation would be able to handle this situation gracefully by just adding the newly found column to the schema, and populating any rows that don't have a value for that column with nulls. The goal of this fix is to modify join so that it can do just that.

Related issues - #4310

fixes: #4506 #4315

Done checklist

  • docs/SPEC.md updated
  • Test cases written

@skartikey skartikey requested review from a team as code owners March 15, 2022 14:57
@skartikey skartikey requested review from onelson and lwandzura and removed request for a team March 15, 2022 14:57
@skartikey
Copy link
Contributor Author

skartikey commented Mar 15, 2022

Tried running the code example present in the issue

import "array"
import "internal/debug"

s1 = array.from(rows: [{unit: "A", power: 100}, {unit: "B", power: 200}, {unit: "C", power: 300}])

s2 =
    union(
        tables: [
            array.from(rows: [{columnA: "valueA", unit: "A", group: "groupX"}])
                |> group(columns: ["columnA", "unit"])
                |> debug.opaque(),
            array.from(rows: [{columnB: "valueB", unit: "B", group: "groupX"}])
                |> group(columns: ["columnB", "unit"])
                |> debug.opaque(),
            array.from(rows: [{unit: "C", group: "groupX"}]) |> group(columns: ["unit"]) |> debug.opaque(),
        ],
    )

join(tables: {s1, s2}, on: ["unit"])
    |> yield()

This gives the following output:

Result: _result
Table: keys: [unit]
           unit:string            group:string                   power:int
----------------------  ----------------------  --------------------------
                     C                  groupX                         300
Table: keys: [columnA, unit]
        columnA:string             unit:string            group:string                   power:int
----------------------  ----------------------  ----------------------  --------------------------
                valueA                       A                  groupX                         100
Table: keys: [columnB, unit]
        columnB:string             unit:string            group:string                   power:int
----------------------  ----------------------  ----------------------  --------------------------
                valueB                       B                  groupX                         200

While going through the tables of the streams, when a missing column is found in the result schema, I am adding it on the fly to the result schema. Hence all the added columns stay in the result and get filled with nil values.

@skartikey skartikey requested a review from wolffcm March 15, 2022 15:59
Copy link
Contributor

@onelson onelson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had a question related to the L/R blocks being slightly different, but looks good to me otherwise.

stdlib/universe/join.go Outdated Show resolved Hide resolved
stdlib/universe/join_test.go Show resolved Hide resolved
Copy link

@wolffcm wolffcm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you said you are still going to add more tests here, is that right?

The product changes look good to me. This version of join has lots of strange edge cases and is hard to understand. Hopefully we can replace it with something just as useful but less difficult to reason about.

For the tests you will be adding, what additional cases did you have in mind? The cases that I think tend to be confusing and bug prone are those where some of the columns in the group key overlap with the join key, and/or where the group key is different between the two sides. Adding some cases like these that include missing columns/heterogeneous schemas seems like a good idea.

I can still think of some cases that should produce an error, like when a column exists on both sides but has a different type. I wonder if we have cases that cover that?

@skartikey skartikey marked this pull request as draft March 16, 2022 10:39
@skartikey
Copy link
Contributor Author

Added following test cases today -

  1. same column on both the streams with different types. [Passed]
  2. different group keys on the left and right stream join on the nongroup key. [Passed]
  3. different group keys on the left and right stream join on the group key. [Failed]

In the third case, the code is only joining the very first table from the left and right stream and discarding the rest of the tables. I am looking into it.

@skartikey skartikey marked this pull request as ready for review March 21, 2022 13:25
@skartikey skartikey requested a review from wolffcm March 21, 2022 13:45
@wolffcm wolffcm merged commit 49ae3cd into master Mar 21, 2022
@skartikey skartikey deleted the skartikey-flux-join-bug branch March 30, 2022 16:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Join function does not properly handle divergent schemas being joined together
4 participants