Skip to content

Commit

Permalink
Merge pull request #21703 from arjunravinarayan/sorter_row_source
Browse files Browse the repository at this point in the history
distsqlrun: refactor sorter to implement RowSource
  • Loading branch information
rjnn authored Feb 12, 2018
2 parents ee6cd7a + bf7e445 commit 45ab257
Show file tree
Hide file tree
Showing 4 changed files with 628 additions and 321 deletions.
116 changes: 33 additions & 83 deletions pkg/sql/distsqlrun/sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,18 @@
package distsqlrun

import (
"sync"

"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
)

// sorter sorts the input rows according to the column ordering specified by ordering. Note
// that this is a no-grouping aggregator and therefore it does not produce a global ordering but
// simply guarantees an intra-stream ordering on the physical output stream.
type sorter struct {
// sorter sorts the input rows according to the specified ordering.
type sorterBase struct {
processorBase

// input is a row source without metadata; the metadata is directed straight
// to out.output.
input NoMetadataRowSource
// rawInput is the true input, not wrapped in a NoMetadataRowSource.
rawInput RowSource
evalCtx *tree.EvalContext

input RowSource
ordering sqlbase.ColumnOrdering
matchLen uint32
// count is the maximum number of rows that the sorter will push to the
Expand All @@ -46,101 +38,59 @@ type sorter struct {
tempStorage engine.Engine
}

var _ Processor = &sorter{}

func newSorter(
func newSorterBase(
flowCtx *FlowCtx, spec *SorterSpec, input RowSource, post *PostProcessSpec, output RowReceiver,
) (*sorter, error) {
) (*sorterBase, error) {
count := int64(0)
if post.Limit != 0 {
// The sorter needs to produce Offset + Limit rows. The ProcOutputHelper
// will discard the first Offset ones.
count = int64(post.Limit) + int64(post.Offset)
}
s := &sorter{
input: MakeNoMetadataRowSource(input, output),
rawInput: input,

s := &sorterBase{
input: input,
ordering: convertToColumnOrdering(spec.OutputOrdering),
matchLen: spec.OrderingMatchLen,
count: count,
tempStorage: flowCtx.TempStorage,
evalCtx: flowCtx.NewEvalCtx(),
}
if err := s.init(post, input.OutputTypes(), flowCtx, nil /* evalCtx */, output); err != nil {
if err := s.init(post, input.OutputTypes(), flowCtx, s.evalCtx, output); err != nil {
return nil, err
}
return s, nil
}

// Run is part of the processor interface.
func (s *sorter) Run(wg *sync.WaitGroup) {
if wg != nil {
defer wg.Done()
}

ctx := log.WithLogTag(s.flowCtx.Ctx, "Sorter", nil)
ctx, span := processorSpan(ctx, "sorter")
defer tracing.FinishSpan(span)

if log.V(2) {
log.Infof(ctx, "starting sorter run")
defer log.Infof(ctx, "exiting sorter run")
func newSorter(
flowCtx *FlowCtx, spec *SorterSpec, input RowSource, post *PostProcessSpec, output RowReceiver,
) (Processor, error) {
s, err := newSorterBase(flowCtx, spec, input, post, output)
if err != nil {
return nil, err
}

var sv memRowContainer
// Enable fall back to disk if the cluster setting is set or a memory limit
// has been set through testing.
st := s.flowCtx.Settings
useTempStorage := settingUseTempStorageSorts.Get(&st.SV) ||
s.flowCtx.testingKnobs.MemoryLimitBytes > 0
rowContainerMon := s.flowCtx.EvalCtx.Mon
if s.matchLen == 0 && s.count == 0 && useTempStorage {
// We will use the sortAllStrategy in this case and potentially fall
// back to disk.
// Limit the memory use by creating a child monitor with a hard limit.
// The strategy will overflow to disk if this limit is not enough.
limit := s.flowCtx.testingKnobs.MemoryLimitBytes
if limit <= 0 {
limit = settingWorkMemBytes.Get(&st.SV)
}
limitedMon := mon.MakeMonitorInheritWithLimit(
"sortall-limited", limit, s.flowCtx.EvalCtx.Mon,
)
limitedMon.Start(ctx, s.flowCtx.EvalCtx.Mon, mon.BoundAccount{})
defer limitedMon.Stop(ctx)

rowContainerMon = &limitedMon
}
sv.initWithMon(s.ordering, s.rawInput.OutputTypes(), s.flowCtx.NewEvalCtx(), rowContainerMon)
// Construct the optimal sorterStrategy.
var ss sorterStrategy
if s.matchLen == 0 {
if s.count == 0 {
// No specified ordering match length and unspecified limit; no
// optimizations are possible so we simply load all rows into memory and
// sort all values in-place. It has a worst-case time complexity of
// O(n*log(n)) and a worst-case space complexity of O(n).
ss = newSortAllStrategy(&sv, useTempStorage)
} else {
// No specified ordering match length but specified limit; we can optimize
// our sort procedure by maintaining a max-heap populated with only the
// smallest k rows seen. It has a worst-case time complexity of
// O(n*log(k)) and a worst-case space complexity of O(k).
ss = newSortTopKStrategy(&sv, s.count)
return newSortAllStrategy(s), nil
}
} else {
// Ordering match length is specified. We will be able to use existing
// ordering in order to avoid loading all the rows into memory. If we're
// scanning an index with a prefix matching an ordering prefix, we can only
// accumulate values for equal fields in this prefix, sort the accumulated
// chunk and then output.
// TODO(irfansharif): Add optimization for case where both ordering match
// length and limit is specified.
ss = newSortChunksStrategy(&sv)
// No specified ordering match length but specified limit; we can optimize
// our sort procedure by maintaining a max-heap populated with only the
// smallest k rows seen. It has a worst-case time complexity of
// O(n*log(k)) and a worst-case space complexity of O(k).
return newSortTopKStrategy(s, s.count), nil
}
// Ordering match length is specified. We will be able to use existing
// ordering in order to avoid loading all the rows into memory. If we're
// scanning an index with a prefix matching an ordering prefix, we can only
// accumulate values for equal fields in this prefix, sort the accumulated
// chunk and then output.
// TODO(irfansharif): Add optimization for case where both ordering match
// length and limit is specified.
return newSortChunksStrategy(s), nil

sortErr := ss.Execute(ctx, s)
if sortErr != nil {
log.Errorf(ctx, "error sorting rows: %s", sortErr)
}
DrainAndClose(ctx, s.out.output, sortErr, s.rawInput)
}
104 changes: 75 additions & 29 deletions pkg/sql/distsqlrun/sorter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,38 @@ func TestSorter(t *testing.T) {
{v[0], v[2], v[2], v[4]},
{v[1], v[2], v[2], v[5]},
},
}, {
name: "SortInputOrderingAlreadySorted",
spec: SorterSpec{
OrderingMatchLen: 2,
OutputOrdering: convertToSpecOrdering(
sqlbase.ColumnOrdering{
{ColIdx: 1, Direction: asc},
{ColIdx: 2, Direction: asc},
{ColIdx: 3, Direction: asc},
}),
},
types: []sqlbase.ColumnType{intType, intType, intType, intType},
input: sqlbase.EncDatumRows{
{v[1], v[1], v[2], v[2]},
{v[0], v[1], v[2], v[3]},
{v[0], v[1], v[2], v[4]},
{v[1], v[1], v[2], v[5]},
{v[1], v[2], v[2], v[2]},
{v[0], v[2], v[2], v[3]},
{v[0], v[2], v[2], v[4]},
{v[1], v[2], v[2], v[5]},
},
expected: sqlbase.EncDatumRows{
{v[1], v[1], v[2], v[2]},
{v[0], v[1], v[2], v[3]},
{v[0], v[1], v[2], v[4]},
{v[1], v[1], v[2], v[5]},
{v[1], v[2], v[2], v[2]},
{v[0], v[2], v[2], v[3]},
{v[0], v[2], v[2], v[4]},
{v[1], v[2], v[2], v[5]},
},
},
}

Expand Down Expand Up @@ -220,39 +252,53 @@ func TestSorter(t *testing.T) {
// 2048: A memory limit that should not be hit; the strategy will not
// use disk.
for _, memLimit := range []int64{0, 1, 1150, 2048} {
t.Run(fmt.Sprintf("%sMemLimit=%d", c.name, memLimit), func(t *testing.T) {
in := NewRowBuffer(c.types, c.input, RowBufferArgs{})
out := &RowBuffer{}
// In theory, SortAllProcessor should be able to handle all sorting
// strategies, as the other strategies are optimizations.
for _, testingForceSortAll := range []bool{false, true} {
t.Run(fmt.Sprintf("MemLimit=%d", memLimit), func(t *testing.T) {
in := NewRowBuffer(c.types, c.input, RowBufferArgs{})
out := &RowBuffer{}

s, err := newSorter(&flowCtx, &c.spec, in, &c.post, out)
if err != nil {
t.Fatal(err)
}
// Override the default memory limit. This will result in using
// a memory row container which will hit this limit and fall
// back to using a disk row container.
s.flowCtx.testingKnobs.MemoryLimitBytes = memLimit
s.Run(nil)
if !out.ProducerClosed {
t.Fatalf("output RowReceiver not closed")
}
var s Processor
if !testingForceSortAll {
var err error
s, err = newSorter(&flowCtx, &c.spec, in, &c.post, out)
if err != nil {
t.Fatal(err)
}
} else {
sb, err := newSorterBase(&flowCtx, &c.spec, in, &c.post, out)
if err != nil {
t.Fatal(err)
}
s = newSortAllStrategy(sb)
}
// Override the default memory limit. This will result in using
// a memory row container which will hit this limit and fall
// back to using a disk row container.
flowCtx.testingKnobs.MemoryLimitBytes = memLimit
s.Run(nil)
if !out.ProducerClosed {
t.Fatalf("output RowReceiver not closed")
}

var retRows sqlbase.EncDatumRows
for {
row := out.NextNoMeta(t)
if row == nil {
break
var retRows sqlbase.EncDatumRows
for {
row := out.NextNoMeta(t)
if row == nil {
break
}
retRows = append(retRows, row)
}
retRows = append(retRows, row)
}

expStr := c.expected.String(c.types)
retStr := retRows.String(c.types)
if expStr != retStr {
t.Errorf("invalid results; expected:\n %s\ngot:\n %s",
expStr, retStr)
}
})
expStr := c.expected.String(c.types)
retStr := retRows.String(c.types)
if expStr != retStr {
t.Errorf("invalid results; expected:\n %s\ngot:\n %s",
expStr, retStr)
}
})
}
}
}
}
Expand Down
Loading

0 comments on commit 45ab257

Please sign in to comment.