Skip to content

Commit

Permalink
Merge pull request #5214 from RaduBerinde/limit-multi-span
Browse files Browse the repository at this point in the history
sql: use new batch limit to limit all scans
  • Loading branch information
RaduBerinde committed Mar 15, 2016
2 parents 4b4bede + 22e9b7d commit 8fbe56f
Show file tree
Hide file tree
Showing 3 changed files with 204 additions and 98 deletions.
85 changes: 85 additions & 0 deletions sql/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -585,3 +585,88 @@ func BenchmarkScan1000Limit100_Cockroach(b *testing.B) {
func BenchmarkScan1000Limit100_Postgres(b *testing.B) {
benchmarkPostgres(b, func(b *testing.B, db *sql.DB) { runBenchmarkScan(b, db, 1000, 100) })
}

// runBenchmarkScanFilter benchmarks scanning (w/filter) from a table containing count1 * count2 rows.
func runBenchmarkScanFilter(b *testing.B, db *sql.DB, count1, count2 int, limit int, filter string) {
if _, err := db.Exec(`DROP TABLE IF EXISTS bench.scan2`); err != nil {
b.Fatal(err)
}
if _, err := db.Exec(`CREATE TABLE bench.scan2 (a INT, b INT, PRIMARY KEY (a, b))`); err != nil {
b.Fatal(err)
}

var buf bytes.Buffer
buf.WriteString(`INSERT INTO bench.scan2 VALUES `)
for i := 0; i < count1; i++ {
for j := 0; j < count2; j++ {
if i+j > 0 {
buf.WriteString(", ")
}
fmt.Fprintf(&buf, "(%d, %d)", i, j)
}
}
if _, err := db.Exec(buf.String()); err != nil {
b.Fatal(err)
}

b.ResetTimer()
for i := 0; i < b.N; i++ {
query := fmt.Sprintf(`SELECT * FROM bench.scan2 WHERE %s`, filter)
if limit != 0 {
query += fmt.Sprintf(` LIMIT %d`, limit)
}
rows, err := db.Query(query)
if err != nil {
b.Fatal(err)
}
n := 0
for rows.Next() {
n++
}
rows.Close()
if err := rows.Err(); err != nil {
b.Fatal(err)
}
}
b.StopTimer()

if _, err := db.Exec(`DROP TABLE bench.scan2`); err != nil {
b.Fatal(err)
}
}

func BenchmarkScan10000FilterLimit1_Cockroach(b *testing.B) {
benchmarkCockroach(b, func(b *testing.B, db *sql.DB) {
runBenchmarkScanFilter(b, db, 10, 1000, 1, `a IN (1, 3, 5, 7) AND b < 10*a`)
})
}

func BenchmarkScan10000FilterLimit1_Postgres(b *testing.B) {
benchmarkPostgres(b, func(b *testing.B, db *sql.DB) {
runBenchmarkScanFilter(b, db, 10, 1000, 1, `a IN (1, 3, 5, 7) AND b < 10*a`)
})
}

func BenchmarkScan10000FilterLimit10_Cockroach(b *testing.B) {
benchmarkCockroach(b, func(b *testing.B, db *sql.DB) {
runBenchmarkScanFilter(b, db, 10, 1000, 10, `a IN (1, 3, 5, 7) AND b < 10*a`)
})
}

func BenchmarkScan10000FilterLimit10_Postgres(b *testing.B) {
benchmarkPostgres(b, func(b *testing.B, db *sql.DB) {
runBenchmarkScanFilter(b, db, 10, 1000, 10, `a IN (1, 3, 5, 7) AND b < 10*a`)
})
}

func BenchmarkScan10000FilterLimit50_Cockroach(b *testing.B) {
benchmarkCockroach(b, func(b *testing.B, db *sql.DB) {
runBenchmarkScanFilter(b, db, 10, 1000, 50, `a IN (1, 3, 5, 7) AND b < 10*a`)
})
}

func BenchmarkScan10000FilterLimit50_Postgres(b *testing.B) {
benchmarkPostgres(b, func(b *testing.B, db *sql.DB) {
runBenchmarkScanFilter(b, db, 10, 1000, 50, `a IN (1, 3, 5, 7) AND b < 10*a`)
})
}
108 changes: 58 additions & 50 deletions sql/kvfetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,63 +128,72 @@ func makeKVFetcher(txn *client.Txn, spans spans, reverse bool, firstBatchLimit i
// fetch retrieves spans from the kv
func (f *kvFetcher) fetch() *roachpb.Error {
// Retrieve all the spans.
b := &client.Batch{}

// TODO(radu): until we have a per-batch limit (issue #4696), we
// only do batching if we have a single span.
if len(f.spans) == 1 {
count := int64(kvBatchSize)
if f.firstBatchLimit != 0 && f.firstBatchLimit < count && len(f.kvs) == 0 {
count = f.firstBatchLimit
}
if f.spans[0].count != 0 {
if f.spans[0].count <= f.totalFetched {
panic(fmt.Sprintf("trying to fetch beyond span count %d (fetched: %d)",
f.spans[0].count, f.totalFetched))
}
remaining := f.spans[0].count - f.totalFetched
if count > remaining {
count = remaining
}
batchSize := int64(kvBatchSize)
if f.firstBatchLimit != 0 && len(f.kvs) == 0 && f.firstBatchLimit < batchSize {
batchSize = f.firstBatchLimit
}

b := &client.Batch{MaxScanResults: batchSize}

var resumeKey roachpb.Key
if len(f.kvs) > 0 {
resumeKey = f.kvs[len(f.kvs)-1].Key
// To resume forward scans we will set the (inclusive) scan start to the Next of the last
// received key. To resume reverse scans we will set the (exclusive) scan end to the last
// received key.
if !f.reverse {
resumeKey = resumeKey.ShallowNext()
}
if f.reverse {
end := f.spans[0].end
if len(f.kvs) > 0 {
// the new range ends at the last key (non-inclusive)
end = f.kvs[len(f.kvs)-1].Key
if end.Equal(f.spans[0].start) {
// No more keys
f.kvs = nil
f.fetchEnd = true
return nil
}

atEnd := true
if !f.reverse {
for i := 0; i < len(f.spans); i++ {
start := f.spans[i].start
if resumeKey != nil {
if resumeKey.Compare(f.spans[i].end) >= 0 {
// We are resuming from a key after this span.
continue
}
}
b.ReverseScan(f.spans[0].start, end, count)
} else {
start := f.spans[0].start
if len(f.kvs) > 0 {
// the new range starts after the last key
start = f.kvs[len(f.kvs)-1].Key.ShallowNext()
if start.Equal(f.spans[0].end) {
// No more keys
f.kvs = nil
f.fetchEnd = true
return nil
if resumeKey.Compare(start) > 0 {
// We are resuming from a key inside this span.
// In this case we should technically reduce the max count for the span; but
// since this count is only an optimization it's not incorrect to retrieve more
// keys for the span.
start = resumeKey
}
}
b.Scan(start, f.spans[0].end, count)
atEnd = false
b.Scan(start, f.spans[i].end, f.spans[i].count)
}
} else {
if f.reverse {
for i := len(f.spans) - 1; i >= 0; i-- {
b.ReverseScan(f.spans[i].start, f.spans[i].end, f.spans[i].count)
}
} else {
for i := 0; i < len(f.spans); i++ {
b.Scan(f.spans[i].start, f.spans[i].end, f.spans[i].count)
for i := len(f.spans) - 1; i >= 0; i-- {
end := f.spans[i].end
if resumeKey != nil {
if resumeKey.Compare(f.spans[i].start) <= 0 {
// We are resuming from a key before this span.
continue
}
if resumeKey.Compare(end) < 0 {
// We are resuming from a key inside this span.
// In this case we should technically reduce the max count for the span; but
// since this count is only an optimization it's not incorrect to retrieve more
// keys for the span.
end = resumeKey
}
}
atEnd = false
b.ReverseScan(f.spans[i].start, end, f.spans[i].count)
}
}

if atEnd {
// The last scan happened to finish just at the end of the last span.
f.kvs = nil
f.fetchEnd = true
return nil
}

if pErr := f.txn.Run(b); pErr != nil {
return pErr
}
Expand All @@ -206,8 +215,7 @@ func (f *kvFetcher) fetch() *roachpb.Error {
f.totalFetched += int64(len(f.kvs))
f.kvIndex = 0

if !(len(f.spans) == 1 && len(f.kvs) == kvBatchSize &&
(f.spans[0].count == 0 || f.spans[0].count > f.totalFetched)) {
if int64(len(f.kvs)) < batchSize {
f.fetchEnd = true
}

Expand Down
109 changes: 61 additions & 48 deletions sql/scan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ import (
"github.com/cockroachdb/cockroach/util/leaktest"
)

// genRanges generates ordered, non-overlaping ranges with values in [0 and valRange).
func genRanges(num int, valRange int) [][2]int {
// genAs returns num random distinct ordered values in [0, valRange).
func genValues(num, valRange int) []int {
// Generate num _distinct_ values. We do this by generating a partial permutation.
perm := make([]int, valRange)
for i := 0; i < valRange; i++ {
Expand All @@ -46,42 +46,51 @@ func genRanges(num int, valRange int) [][2]int {
perm = perm[:num]
// Sort the values. These distinct values will be the starts of our ranges.
sort.Ints(perm)
res := make([][2]int, num)
for i := 0; i < num; i++ {
res[i][0] = perm[i]
next := valRange
if i < num-1 {
next = perm[i+1]
return perm
}

// testScanBatchQuery runs a query of the form
// SELECT a,B FROM test.scan WHERE a IN (1,5,3..) AND b >= 5 AND b <= 10
// numSpans controls the number of possible values for a.
func testScanBatchQuery(t *testing.T, db *sql.DB, numSpans, numAs, numBs int, reverse bool) {
// Generate numSpans values for A
aVals := genValues(numSpans, numAs)

// Generate a random range for B
bStart := rand.Int() % numBs
bEnd := bStart + rand.Int()%(numBs-bStart)

var expected [][2]int
for _, a := range aVals {
for b := bStart; b <= bEnd; b++ {
expected = append(expected, [2]int{a, b})
}
}

if len(aVals) == 0 {
// No filter on a.
for a := 0; a < numAs; a++ {
for b := bStart; b <= bEnd; b++ {
expected = append(expected, [2]int{a, b})
}
}
// Pick a random end in the range [perm[i], next).
res[i][1] = perm[i] + rand.Int()%(next-perm[i])
}
return res
}

func testScanBatchQuery(t *testing.T, db *sql.DB, numRanges int, numRows int, reverse bool) {
ranges := genRanges(numRanges, numRows)
expected := []int(nil)
var buf bytes.Buffer
buf.WriteString(`SELECT k FROM test.scan`)
for i, r := range ranges {
buf.WriteString(fmt.Sprintf("SELECT a,b FROM test.scan WHERE b >= %d AND b <= %d", bStart, bEnd))
for i, a := range aVals {
if i == 0 {
buf.WriteString(" WHERE ")
buf.WriteString(fmt.Sprintf(" AND a IN (%d", a))
} else {
buf.WriteString(" OR ")
}
buf.WriteString(fmt.Sprintf(`(k >= %d AND k <= %d)`, r[0], r[1]))
for j := r[0]; j <= r[1]; j++ {
expected = append(expected, j)
buf.WriteString(fmt.Sprintf(",%d", a))
}
}
if len(ranges) == 0 {
for j := 0; j < numRows; j++ {
expected = append(expected, j)
}
if len(aVals) > 0 {
buf.WriteString(")")
}

if reverse {
buf.WriteString(" ORDER BY k DESC")
buf.WriteString(" ORDER BY a DESC, b DESC")
for i, j := 0, len(expected)-1; i < j; i, j = i+1, j-1 {
expected[i], expected[j] = expected[j], expected[i]
}
Expand All @@ -95,13 +104,14 @@ func testScanBatchQuery(t *testing.T, db *sql.DB, numRanges int, numRows int, re
if n >= len(expected) {
t.Fatalf("too many rows (expected %d)", len(expected))
}
var val int
err = rows.Scan(&val)
var a, b int
err = rows.Scan(&a, &b)
if err != nil {
t.Fatal(err)
}
if val != expected[n] {
t.Errorf("row %d: invalid value %d (expected %d)", n, val, expected[n])
if a != expected[n][0] || b != expected[n][1] {
t.Errorf("row %d: invalid values %d,%d (expected %d,%d)",
n, a, b, expected[n][0], expected[n][1])
}
n++
}
Expand Down Expand Up @@ -140,42 +150,45 @@ func TestScanBatches(t *testing.T) {
restore := csql.SetKVBatchSize(10)
defer restore()

numRows := 100
numAs := 5
numBs := 20

if _, err := db.Exec(`DROP TABLE IF EXISTS test.scan`); err != nil {
t.Fatal(err)
}
if _, err := db.Exec(`CREATE TABLE test.scan (k INT PRIMARY KEY, v STRING)`); err != nil {
if _, err := db.Exec(`CREATE TABLE test.scan (a INT, b INT, v STRING, PRIMARY KEY (a, b))`); err != nil {
t.Fatal(err)
}

var buf bytes.Buffer
buf.WriteString(`INSERT INTO test.scan VALUES `)
for i := 0; i < numRows; i++ {
if i > 0 {
buf.WriteString(", ")
}
if i%2 == 0 {
fmt.Fprintf(&buf, "(%d, 'str%d')", i, i)
} else {
// Every other row doesn't get the string value (to have NULLs).
fmt.Fprintf(&buf, "(%d, NULL)", i)
for a := 0; a < numAs; a++ {
for b := 0; b < numBs; b++ {
if a+b > 0 {
buf.WriteString(", ")
}
if (a+b)%2 == 0 {
fmt.Fprintf(&buf, "(%d, %d, 'str%d%d')", a, b, a, b)
} else {
// Every other row doesn't get the string value (to have NULLs).
fmt.Fprintf(&buf, "(%d, %d, NULL)", a, b)
}
}
}
if _, err := db.Exec(buf.String()); err != nil {
t.Fatal(err)
}

// The table will have one key for the even rows, and two keys for the odd rows.
batchSizes := []int{1, 2, 3, 5, 10, 13, 100, 3*numRows/2 - 1, 3 * numRows / 2, 3*numRows/2 + 1}
// We can test with at most one one span for now (see kvFetcher.fetch)
numSpanValues := []int{0, 1}
numKeys := 3 * numAs * numBs / 2
batchSizes := []int{1, 2, 3, 5, 10, 13, 100, numKeys - 1, numKeys, numKeys + 1}
numSpanValues := []int{0, 1, 2, 3}

for _, batch := range batchSizes {
csql.SetKVBatchSize(batch)
for _, numSpans := range numSpanValues {
testScanBatchQuery(t, db, numSpans, numRows, false)
testScanBatchQuery(t, db, numSpans, numRows, true)
testScanBatchQuery(t, db, numSpans, numAs, numBs, false)
testScanBatchQuery(t, db, numSpans, numAs, numBs, true)
}
}

Expand Down

0 comments on commit 8fbe56f

Please sign in to comment.