Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spark): add Window support #307

Merged
merged 1 commit into from
Oct 25, 2024
Merged

Conversation

andrew-coleman
Copy link
Contributor

@andrew-coleman andrew-coleman commented Oct 16, 2024

To support the OVER clause in SQL

This fixes some of the TPC-DS tests.

@vbarua
Copy link
Member

vbarua commented Oct 16, 2024

This will also fix several of the TPC-DS tests, but I will enable those in a future PR to try and avoid merge conflicts with other PRs that are currently open.

I've gone ahead and reviewed and merged the other PRs so we shouldn't have any issues with conflict there, beyond the one that exists now. I can review this PR tomorrow.

Copy link
Member

@vbarua vbarua left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some questions.

WindowSpecDefinition(_, _, SpecifiedWindowFrame(frameType, lower, upper))) =>
(fromSpark(frameType), fromSparkPreceding(lower), fromSparkFollowing(upper))
case WindowExpression(_, WindowSpecDefinition(_, _, UnspecifiedFrame)) =>
(WindowBoundsType.ROWS, UNBOUNDED, CURRENT_ROW)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was curious about the default behaviour if the frame is unspecified, because for Postgres it is RANGE UNBOUNDED PRECEDING, however it looks like for Spark it seems to depend on whether the ordering is defined:

 * @note
 *   When ordering is not defined, an unbounded window frame (rowFrame, unboundedPreceding,
 *   unboundedFollowing) is used by default. When ordering is defined, a growing window frame
 *   (rangeFrame, unboundedPreceding, currentRow) is used by default.

https://github.com/apache/spark/blob/f8d92224b9af4ffffbb83ca2c9dd3c3b909b135d/sql/api/src/main/scala/org/apache/spark/sql/expressions/Window.scala#L35-L38

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good spot, I hadn't noticed that comment.


val (frameType, lower, upper) = sparkExp match {
case WindowExpression(_: OffsetWindowFunction, _) =>
(WindowBoundsType.ROWS, UNBOUNDED, CURRENT_ROW)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is an OffsetWindowFunction. Does it somehow override the frame bounds?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's my understanding. TBH I'm not sure exactly how it should translate to substrait, but it's ignored by spark (it matches the lag/lead functions)
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala#L404

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You actually should get the frame here as well properly. Otherwise the bound will be incorrect for other consumers (unbounded preceeding and current row will not work for Lead, for example) and Spark will throw. You'll see that if you add logicalPlan2.show() into SubstraitPlanTestBase::assertSqlToSubstraitRoundTrip and re-run the lag/lead test. (We should also make the tests always evaluate the converted plans to ensure they are actually valid, I'll try to get to that)

Can be fixed by just removing this case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I've removed it for now

Copy link
Contributor

@Blizzara Blizzara left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Overall looks good, but there's some things in the bounds that I think aren't always correct

.append("sorts=")
.append(window.getSorts)
})
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, in our internal fork I just deleted the whole RelToVerboseString thing. I don't see it bringing that much value over pretty-printing just the protobuf. And there's a fair amount of work to maintain this.

@@ -73,9 +73,22 @@ class FunctionMappings {
s[HyperLogLogPlusPlus]("approx_count_distinct")
)

val WINDOW_SIGS: Seq[Sig] = Seq(
s[RowNumber]("row_number"),
s[Rank]("rank"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From looking at our internal fork's window impl, the rank functions in Spark have some child column but substrait doesn't define any. I wonder how it works here, ie how do get a match still?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s[CumeDist]("cume_dist"),
s[NTile]("ntile"),
s[Lead]("lead"),
s[Lag]("lag"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also have a note around Lead and Lag having an NullType null as a child, which I think Substrait doesn't support, did you run into anything like that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't seen this - do you have a test case?

Copy link
Contributor

@Blizzara Blizzara Oct 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yes - not known for my memory! 😂

case other => throw new UnsupportedOperationException(s"Unsupported bounds type: $other.")
}

def fromSparkPreceding(bound: Expression): WindowBound = bound match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Substrait defines the bounds as being strictly positive integers, but iirc Spark may have a "preceeding -1 row" for example. This is what I used:

    expr match {
      case UnboundedPreceding => WindowBound.UNBOUNDED
      case UnboundedFollowing => WindowBound.UNBOUNDED
      case CurrentRow => WindowBound.CURRENT_ROW
      case e: Literal =>
        e.dataType match {
          case IntegerType => {
            val offset = e.eval().asInstanceOf[Int]
            if (offset < 0) WindowBound.Preceding.of(-offset)
            else if (offset == 0) WindowBound.CURRENT_ROW
            else WindowBound.Following.of(offset)
          }
        }
      case _ => throw new UnsupportedOperationException(s"Unexpected bound: $expr")
    }
  }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I've changed it this.

case UNBOUNDED => UnboundedPreceding
case CURRENT_ROW => CurrentRow
case p: Preceding => Literal(p.offset())
case _ => throw new UnsupportedOperationException(s"Unsupported bounds expression $bound")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this also doesn't work for the same reason as above, the func.lowerBound() may very well be a Following (and vice versa below). So you should handle both in both.

Our version was a bit different, yours might be nice (once fixed):

def toSparkFrame(
      boundsType: WindowBoundsType,
      lowerBound: WindowBound,
      upperBound: WindowBound): WindowFrame = {
    val frameType = boundsType match {
      case WindowBoundsType.ROWS => RowFrame
      case WindowBoundsType.RANGE => RangeFrame
      case WindowBoundsType.UNSPECIFIED => return UnspecifiedFrame
    }
    SpecifiedWindowFrame(
      frameType,
      toSparkBound(lowerBound, isLower = true),
      toSparkBound(upperBound, isLower = false))
  }

  private def toSparkBound(bound: WindowBound, isLower: Boolean): Expression = {
    bound.accept(new WindowBoundVisitor[Expression, Exception] {

      override def visit(preceding: WindowBound.Preceding): Expression =
        Literal(-preceding.offset().intValue())

      override def visit(following: WindowBound.Following): Expression =
        Literal(following.offset().intValue())

      override def visit(currentRow: WindowBound.CurrentRow): Expression = CurrentRow

      override def visit(unbounded: WindowBound.Unbounded): Expression =
        if (isLower) UnboundedPreceding else UnboundedFollowing
    })
  }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yours is nicer :).

|
|""".stripMargin
assertSqlSubstraitRelRoundTrip(query)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe add a test with two different partitions in same select? I think it should pass fine but just in case

assertSqlSubstraitRelRoundTrip(query)
}

test("min") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe name this as "aggregate" or something, as I think that's what it's testing (fact that it's min specifically is less relevant)?

Suggested change
test("min") {
test("aggregate") {

@andrew-coleman
Copy link
Contributor Author

Thanks @Blizzara, there’s some really useful feedback here. I hadn’t realised you had already been implementing this, it would be awesome if you want to collaborate :)

I’d only implemented as much as necessary to enable the windowing tests in the TPC-DS suite to pass. If you’ve got other tests that require additional logic in the converter, then it would be great to add these. But perhaps in a future PR in the interest of moving things forward incrementally?

@Blizzara
Copy link
Contributor

Thanks @Blizzara, there’s some really useful feedback here. I hadn’t realised you had already been implementing this, it would be awesome if you want to collaborate :)

Yea, shame on me - it's been on my todo-list ever since you added this into substrait-java to pull up our changes. Some of those changes are non-trivial refactorings which might be annoying, but I'll try to get to it!

I’d only implemented as much as necessary to enable the windowing tests in the TPC-DS suite to pass. If you’ve got other tests that require additional logic in the converter, then it would be great to add these. But perhaps in a future PR in the interest of moving things forward incrementally?

I'd like to see the bounds things fixed, since currently this implementation doesn't adhere to the bounds (preceeding, following) being strictly positive. That's not a problem for the roundtrip tests, but it means a plan generated here may not be valid Substrait that can be consumed by other

@Blizzara
Copy link
Contributor

Thanks @Blizzara, there’s some really useful feedback here. I hadn’t realised you had already been implementing this, it would be awesome if you want to collaborate :)

Starting with #311! After that I'll add structs (+ fix the name handling), and then I have some more complicated changes in how FunctionMappings works to allow for more complicated mappings.

To support the OVER clause in SQL

Signed-off-by: Andrew Coleman <[email protected]>
WindowSpecDefinition(_, _, SpecifiedWindowFrame(frameType, lower, upper))) =>
(fromSpark(frameType), fromSpark(lower), fromSpark(upper))
case WindowExpression(_, WindowSpecDefinition(_, orderSpec, UnspecifiedFrame)) =>
if (orderSpec.isEmpty) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"q21", "q22", "q23a", "q23b", "q24a", "q24b", "q25", "q26", "q27", "q28", "q29",
"q30", "q31", "q32", "q33", "q37", "q38",
"q40", "q41", "q42", "q43", "q46", "q48",
val successfulSQL: Set[String] = Set("q1", "q3", "q4", "q5", "q7", "q8",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given how this is growing, might be worth starting to list the ones that don't work instead 😄

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly, I was going to do this after my next PR which will add support for more numeric functions. The number of failing tests will be relatively small then.

Copy link
Contributor

@Blizzara Blizzara left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks!

@Blizzara Blizzara merged commit b3f61a2 into substrait-io:main Oct 25, 2024
13 checks passed
@andrew-coleman andrew-coleman deleted the window branch October 25, 2024 16:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants