From fcef0610597763d1dcdb8b86faa0b4312abc881b Mon Sep 17 00:00:00 2001 From: Haibin Xie Date: Thu, 22 Aug 2019 11:38:28 +0800 Subject: [PATCH] executor: fix wrong partition boundary for window funcions (#11637) --- executor/window.go | 12 +++--------- executor/window_test.go | 4 ++++ util/chunk/chunk.go | 6 +++--- util/chunk/row.go | 2 +- 4 files changed, 11 insertions(+), 13 deletions(-) diff --git a/executor/window.go b/executor/window.go index ee151cfd80fe6..b1abd402dc799 100644 --- a/executor/window.go +++ b/executor/window.go @@ -52,7 +52,7 @@ func (e *WindowExec) Close() error { // Next implements the Executor Next interface. func (e *WindowExec) Next(ctx context.Context, chk *chunk.Chunk) error { chk.Reset() - if e.meetNewGroup && e.remainingRowsInGroup > 0 { + if (e.executed || e.meetNewGroup) && e.remainingRowsInGroup > 0 { err := e.appendResult2Chunk(chk) if err != nil { return err @@ -78,22 +78,16 @@ func (e *WindowExec) consumeOneGroup(ctx context.Context, chk *chunk.Chunk) erro if err != nil { return errors.Trace(err) } - if e.meetNewGroup { + if e.meetNewGroup && e.remainingRowsInGroup > 0 { err := e.consumeGroupRows() if err != nil { return errors.Trace(err) } err = e.appendResult2Chunk(chk) - if err != nil { - return errors.Trace(err) - } + return err } e.remainingRowsInGroup++ e.groupRows = append(e.groupRows, e.inputRow) - if e.meetNewGroup { - e.inputRow = e.inputIter.Next() - return nil - } } return nil } diff --git a/executor/window_test.go b/executor/window_test.go index cfca8c62d4796..736be720cbbad 100644 --- a/executor/window_test.go +++ b/executor/window_test.go @@ -172,6 +172,10 @@ func (s *testSuite4) TestWindowFunctions(c *C) { result.Check(testkit.Rows("1 3", "2 3", "3 6", "4 6")) result = tk.MustQuery("select row_number() over w, sum(b) over w from t window w as (rows between 1 preceding and 1 following)") result.Check(testkit.Rows("1 3", "2 4", "3 5", "4 3")) + + tk.Se.GetSessionVars().MaxChunkSize = 1 + result = tk.MustQuery("select a, row_number() over (partition by a) from t") + result.Check(testkit.Rows("1 1", "1 2", "2 1", "2 2")) } func (s *testSuite4) TestWindowFunctionsIssue11614(c *C) { diff --git a/util/chunk/chunk.go b/util/chunk/chunk.go index 552a88dc126e8..ffc7cd55abfa4 100644 --- a/util/chunk/chunk.go +++ b/util/chunk/chunk.go @@ -80,7 +80,7 @@ func New(fields []*types.FieldType, cap, maxChunkSize int) *Chunk { // renewWithCapacity creates a new Chunk based on an existing Chunk with capacity. The newly // created Chunk has the same data schema with the old Chunk. -func renewWithCapacity(chk *Chunk, cap int) *Chunk { +func renewWithCapacity(chk *Chunk, cap, maxChunkSize int) *Chunk { newChk := new(Chunk) if chk.columns == nil { return newChk @@ -88,7 +88,7 @@ func renewWithCapacity(chk *Chunk, cap int) *Chunk { newChk.columns = renewColumns(chk.columns, cap) newChk.numVirtualRows = 0 newChk.capacity = cap - newChk.requiredRows = cap + newChk.requiredRows = maxChunkSize return newChk } @@ -99,7 +99,7 @@ func renewWithCapacity(chk *Chunk, cap int) *Chunk { // maxChunkSize: the limit for the max number of rows. func Renew(chk *Chunk, maxChunkSize int) *Chunk { newCap := reCalcCapacity(chk, maxChunkSize) - return renewWithCapacity(chk, newCap) + return renewWithCapacity(chk, newCap, maxChunkSize) } // renewColumns creates the columns of a Chunk. The capacity of the newly diff --git a/util/chunk/row.go b/util/chunk/row.go index 1ef88d2aba125..0d83c4dec5525 100644 --- a/util/chunk/row.go +++ b/util/chunk/row.go @@ -211,7 +211,7 @@ func (r Row) IsNull(colIdx int) bool { // CopyConstruct creates a new row and copies this row's data into it. func (r Row) CopyConstruct() Row { - newChk := renewWithCapacity(r.c, 1) + newChk := renewWithCapacity(r.c, 1, 1) newChk.AppendRow(r) return newChk.GetRow(0) }