Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: use new batch limit to limit all scans #5214

Merged
merged 2 commits into from
Mar 15, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 85 additions & 0 deletions sql/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -585,3 +585,88 @@ func BenchmarkScan1000Limit100_Cockroach(b *testing.B) {
func BenchmarkScan1000Limit100_Postgres(b *testing.B) {
benchmarkPostgres(b, func(b *testing.B, db *sql.DB) { runBenchmarkScan(b, db, 1000, 100) })
}

// runBenchmarkScanFilter benchmarks scanning (w/filter) from a table containing count1 * count2 rows.
func runBenchmarkScanFilter(b *testing.B, db *sql.DB, count1, count2 int, limit int, filter string) {
if _, err := db.Exec(`DROP TABLE IF EXISTS bench.scan2`); err != nil {
b.Fatal(err)
}
if _, err := db.Exec(`CREATE TABLE bench.scan2 (a INT, b INT, PRIMARY KEY (a, b))`); err != nil {
b.Fatal(err)
}

var buf bytes.Buffer
buf.WriteString(`INSERT INTO bench.scan2 VALUES `)
for i := 0; i < count1; i++ {
for j := 0; j < count2; j++ {
if i+j > 0 {
buf.WriteString(", ")
}
fmt.Fprintf(&buf, "(%d, %d)", i, j)
}
}
if _, err := db.Exec(buf.String()); err != nil {
b.Fatal(err)
}

b.ResetTimer()
for i := 0; i < b.N; i++ {
query := fmt.Sprintf(`SELECT * FROM bench.scan2 WHERE %s`, filter)
if limit != 0 {
query += fmt.Sprintf(` LIMIT %d`, limit)
}
rows, err := db.Query(query)
if err != nil {
b.Fatal(err)
}
n := 0
for rows.Next() {
n++
}
rows.Close()
if err := rows.Err(); err != nil {
b.Fatal(err)
}
}
b.StopTimer()

if _, err := db.Exec(`DROP TABLE bench.scan2`); err != nil {
b.Fatal(err)
}
}

func BenchmarkScan10000FilterLimit1_Cockroach(b *testing.B) {
benchmarkCockroach(b, func(b *testing.B, db *sql.DB) {
runBenchmarkScanFilter(b, db, 10, 1000, 1, `a IN (1, 3, 5, 7) AND b < 10*a`)
})
}

func BenchmarkScan10000FilterLimit1_Postgres(b *testing.B) {
benchmarkPostgres(b, func(b *testing.B, db *sql.DB) {
runBenchmarkScanFilter(b, db, 10, 1000, 1, `a IN (1, 3, 5, 7) AND b < 10*a`)
})
}

func BenchmarkScan10000FilterLimit10_Cockroach(b *testing.B) {
benchmarkCockroach(b, func(b *testing.B, db *sql.DB) {
runBenchmarkScanFilter(b, db, 10, 1000, 10, `a IN (1, 3, 5, 7) AND b < 10*a`)
})
}

func BenchmarkScan10000FilterLimit10_Postgres(b *testing.B) {
benchmarkPostgres(b, func(b *testing.B, db *sql.DB) {
runBenchmarkScanFilter(b, db, 10, 1000, 10, `a IN (1, 3, 5, 7) AND b < 10*a`)
})
}

func BenchmarkScan10000FilterLimit50_Cockroach(b *testing.B) {
benchmarkCockroach(b, func(b *testing.B, db *sql.DB) {
runBenchmarkScanFilter(b, db, 10, 1000, 50, `a IN (1, 3, 5, 7) AND b < 10*a`)
})
}

func BenchmarkScan10000FilterLimit50_Postgres(b *testing.B) {
benchmarkPostgres(b, func(b *testing.B, db *sql.DB) {
runBenchmarkScanFilter(b, db, 10, 1000, 50, `a IN (1, 3, 5, 7) AND b < 10*a`)
})
}
108 changes: 58 additions & 50 deletions sql/kvfetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,63 +128,72 @@ func makeKVFetcher(txn *client.Txn, spans spans, reverse bool, firstBatchLimit i
// fetch retrieves spans from the kv
func (f *kvFetcher) fetch() *roachpb.Error {
// Retrieve all the spans.
b := &client.Batch{}

// TODO(radu): until we have a per-batch limit (issue #4696), we
// only do batching if we have a single span.
if len(f.spans) == 1 {
count := int64(kvBatchSize)
if f.firstBatchLimit != 0 && f.firstBatchLimit < count && len(f.kvs) == 0 {
count = f.firstBatchLimit
}
if f.spans[0].count != 0 {
if f.spans[0].count <= f.totalFetched {
panic(fmt.Sprintf("trying to fetch beyond span count %d (fetched: %d)",
f.spans[0].count, f.totalFetched))
}
remaining := f.spans[0].count - f.totalFetched
if count > remaining {
count = remaining
}
batchSize := int64(kvBatchSize)
if f.firstBatchLimit != 0 && len(f.kvs) == 0 && f.firstBatchLimit < batchSize {
batchSize = f.firstBatchLimit
}

b := &client.Batch{MaxScanResults: batchSize}

var resumeKey roachpb.Key
if len(f.kvs) > 0 {
resumeKey = f.kvs[len(f.kvs)-1].Key
// To resume forward scans we will set the (inclusive) scan start to the Next of the last
// received key. To resume reverse scans we will set the (exclusive) scan end to the last
// received key.
if !f.reverse {
resumeKey = resumeKey.ShallowNext()
}
if f.reverse {
end := f.spans[0].end
if len(f.kvs) > 0 {
// the new range ends at the last key (non-inclusive)
end = f.kvs[len(f.kvs)-1].Key
if end.Equal(f.spans[0].start) {
// No more keys
f.kvs = nil
f.fetchEnd = true
return nil
}

atEnd := true
if !f.reverse {
for i := 0; i < len(f.spans); i++ {
start := f.spans[i].start
if resumeKey != nil {
if resumeKey.Compare(f.spans[i].end) >= 0 {
// We are resuming from a key after this span.
continue
}
}
b.ReverseScan(f.spans[0].start, end, count)
} else {
start := f.spans[0].start
if len(f.kvs) > 0 {
// the new range starts after the last key
start = f.kvs[len(f.kvs)-1].Key.ShallowNext()
if start.Equal(f.spans[0].end) {
// No more keys
f.kvs = nil
f.fetchEnd = true
return nil
if resumeKey.Compare(start) > 0 {
// We are resuming from a key inside this span.
// In this case we should technically reduce the max count for the span; but
// since this count is only an optimization it's not incorrect to retrieve more
// keys for the span.
start = resumeKey
}
}
b.Scan(start, f.spans[0].end, count)
atEnd = false
b.Scan(start, f.spans[i].end, f.spans[i].count)
}
} else {
if f.reverse {
for i := len(f.spans) - 1; i >= 0; i-- {
b.ReverseScan(f.spans[i].start, f.spans[i].end, f.spans[i].count)
}
} else {
for i := 0; i < len(f.spans); i++ {
b.Scan(f.spans[i].start, f.spans[i].end, f.spans[i].count)
for i := len(f.spans) - 1; i >= 0; i-- {
end := f.spans[i].end
if resumeKey != nil {
if resumeKey.Compare(f.spans[i].start) <= 0 {
// We are resuming from a key before this span.
continue
}
if resumeKey.Compare(end) < 0 {
// We are resuming from a key inside this span.
// In this case we should technically reduce the max count for the span; but
// since this count is only an optimization it's not incorrect to retrieve more
// keys for the span.
end = resumeKey
}
}
atEnd = false
b.ReverseScan(f.spans[i].start, end, f.spans[i].count)
}
}

if atEnd {
// The last scan happened to finish just at the end of the last span.
f.kvs = nil
f.fetchEnd = true
return nil
}

if pErr := f.txn.Run(b); pErr != nil {
return pErr
}
Expand All @@ -206,8 +215,7 @@ func (f *kvFetcher) fetch() *roachpb.Error {
f.totalFetched += int64(len(f.kvs))
f.kvIndex = 0

if !(len(f.spans) == 1 && len(f.kvs) == kvBatchSize &&
(f.spans[0].count == 0 || f.spans[0].count > f.totalFetched)) {
if int64(len(f.kvs)) < batchSize {
f.fetchEnd = true
}

Expand Down
109 changes: 61 additions & 48 deletions sql/scan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ import (
"github.com/cockroachdb/cockroach/util/leaktest"
)

// genRanges generates ordered, non-overlaping ranges with values in [0 and valRange).
func genRanges(num int, valRange int) [][2]int {
// genAs returns num random distinct ordered values in [0, valRange).
func genValues(num, valRange int) []int {
// Generate num _distinct_ values. We do this by generating a partial permutation.
perm := make([]int, valRange)
for i := 0; i < valRange; i++ {
Expand All @@ -46,42 +46,51 @@ func genRanges(num int, valRange int) [][2]int {
perm = perm[:num]
// Sort the values. These distinct values will be the starts of our ranges.
sort.Ints(perm)
res := make([][2]int, num)
for i := 0; i < num; i++ {
res[i][0] = perm[i]
next := valRange
if i < num-1 {
next = perm[i+1]
return perm
}

// testScanBatchQuery runs a query of the form
// SELECT a,B FROM test.scan WHERE a IN (1,5,3..) AND b >= 5 AND b <= 10
// numSpans controls the number of possible values for a.
func testScanBatchQuery(t *testing.T, db *sql.DB, numSpans, numAs, numBs int, reverse bool) {
// Generate numSpans values for A
aVals := genValues(numSpans, numAs)

// Generate a random range for B
bStart := rand.Int() % numBs
bEnd := bStart + rand.Int()%(numBs-bStart)

var expected [][2]int
for _, a := range aVals {
for b := bStart; b <= bEnd; b++ {
expected = append(expected, [2]int{a, b})
}
}

if len(aVals) == 0 {
// No filter on a.
for a := 0; a < numAs; a++ {
for b := bStart; b <= bEnd; b++ {
expected = append(expected, [2]int{a, b})
}
}
// Pick a random end in the range [perm[i], next).
res[i][1] = perm[i] + rand.Int()%(next-perm[i])
}
return res
}

func testScanBatchQuery(t *testing.T, db *sql.DB, numRanges int, numRows int, reverse bool) {
ranges := genRanges(numRanges, numRows)
expected := []int(nil)
var buf bytes.Buffer
buf.WriteString(`SELECT k FROM test.scan`)
for i, r := range ranges {
buf.WriteString(fmt.Sprintf("SELECT a,b FROM test.scan WHERE b >= %d AND b <= %d", bStart, bEnd))
for i, a := range aVals {
if i == 0 {
buf.WriteString(" WHERE ")
buf.WriteString(fmt.Sprintf(" AND a IN (%d", a))
} else {
buf.WriteString(" OR ")
}
buf.WriteString(fmt.Sprintf(`(k >= %d AND k <= %d)`, r[0], r[1]))
for j := r[0]; j <= r[1]; j++ {
expected = append(expected, j)
buf.WriteString(fmt.Sprintf(",%d", a))
}
}
if len(ranges) == 0 {
for j := 0; j < numRows; j++ {
expected = append(expected, j)
}
if len(aVals) > 0 {
buf.WriteString(")")
}

if reverse {
buf.WriteString(" ORDER BY k DESC")
buf.WriteString(" ORDER BY a DESC, b DESC")
for i, j := 0, len(expected)-1; i < j; i, j = i+1, j-1 {
expected[i], expected[j] = expected[j], expected[i]
}
Expand All @@ -95,13 +104,14 @@ func testScanBatchQuery(t *testing.T, db *sql.DB, numRanges int, numRows int, re
if n >= len(expected) {
t.Fatalf("too many rows (expected %d)", len(expected))
}
var val int
err = rows.Scan(&val)
var a, b int
err = rows.Scan(&a, &b)
if err != nil {
t.Fatal(err)
}
if val != expected[n] {
t.Errorf("row %d: invalid value %d (expected %d)", n, val, expected[n])
if a != expected[n][0] || b != expected[n][1] {
t.Errorf("row %d: invalid values %d,%d (expected %d,%d)",
n, a, b, expected[n][0], expected[n][1])
}
n++
}
Expand Down Expand Up @@ -140,42 +150,45 @@ func TestScanBatches(t *testing.T) {
restore := csql.SetKVBatchSize(10)
defer restore()

numRows := 100
numAs := 5
numBs := 20

if _, err := db.Exec(`DROP TABLE IF EXISTS test.scan`); err != nil {
t.Fatal(err)
}
if _, err := db.Exec(`CREATE TABLE test.scan (k INT PRIMARY KEY, v STRING)`); err != nil {
if _, err := db.Exec(`CREATE TABLE test.scan (a INT, b INT, v STRING, PRIMARY KEY (a, b))`); err != nil {
t.Fatal(err)
}

var buf bytes.Buffer
buf.WriteString(`INSERT INTO test.scan VALUES `)
for i := 0; i < numRows; i++ {
if i > 0 {
buf.WriteString(", ")
}
if i%2 == 0 {
fmt.Fprintf(&buf, "(%d, 'str%d')", i, i)
} else {
// Every other row doesn't get the string value (to have NULLs).
fmt.Fprintf(&buf, "(%d, NULL)", i)
for a := 0; a < numAs; a++ {
for b := 0; b < numBs; b++ {
if a+b > 0 {
buf.WriteString(", ")
}
if (a+b)%2 == 0 {
fmt.Fprintf(&buf, "(%d, %d, 'str%d%d')", a, b, a, b)
} else {
// Every other row doesn't get the string value (to have NULLs).
fmt.Fprintf(&buf, "(%d, %d, NULL)", a, b)
}
}
}
if _, err := db.Exec(buf.String()); err != nil {
t.Fatal(err)
}

// The table will have one key for the even rows, and two keys for the odd rows.
batchSizes := []int{1, 2, 3, 5, 10, 13, 100, 3*numRows/2 - 1, 3 * numRows / 2, 3*numRows/2 + 1}
// We can test with at most one one span for now (see kvFetcher.fetch)
numSpanValues := []int{0, 1}
numKeys := 3 * numAs * numBs / 2
batchSizes := []int{1, 2, 3, 5, 10, 13, 100, numKeys - 1, numKeys, numKeys + 1}
numSpanValues := []int{0, 1, 2, 3}

for _, batch := range batchSizes {
csql.SetKVBatchSize(batch)
for _, numSpans := range numSpanValues {
testScanBatchQuery(t, db, numSpans, numRows, false)
testScanBatchQuery(t, db, numSpans, numRows, true)
testScanBatchQuery(t, db, numSpans, numAs, numBs, false)
testScanBatchQuery(t, db, numSpans, numAs, numBs, true)
}
}

Expand Down