From 4b3e9c5be17d81162f4d3944165841ecfb5d0af1 Mon Sep 17 00:00:00 2001 From: Radu Berinde Date: Sun, 13 Mar 2016 18:31:06 -0400 Subject: [PATCH 1/2] sql: benchmarks for multi-span scan with limit The benchmarks use a table with 10 * 1000 rows. The query involves 4 spans of 1000 rows each, with the filter passing on only the first 10,30,50,70 rows respectively. --- sql/bench_test.go | 85 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 85 insertions(+) diff --git a/sql/bench_test.go b/sql/bench_test.go index 3bc4b2b4b423..261f35c0d8dd 100644 --- a/sql/bench_test.go +++ b/sql/bench_test.go @@ -585,3 +585,88 @@ func BenchmarkScan1000Limit100_Cockroach(b *testing.B) { func BenchmarkScan1000Limit100_Postgres(b *testing.B) { benchmarkPostgres(b, func(b *testing.B, db *sql.DB) { runBenchmarkScan(b, db, 1000, 100) }) } + +// runBenchmarkScanFilter benchmarks scanning (w/filter) from a table containing count1 * count2 rows. +func runBenchmarkScanFilter(b *testing.B, db *sql.DB, count1, count2 int, limit int, filter string) { + if _, err := db.Exec(`DROP TABLE IF EXISTS bench.scan2`); err != nil { + b.Fatal(err) + } + if _, err := db.Exec(`CREATE TABLE bench.scan2 (a INT, b INT, PRIMARY KEY (a, b))`); err != nil { + b.Fatal(err) + } + + var buf bytes.Buffer + buf.WriteString(`INSERT INTO bench.scan2 VALUES `) + for i := 0; i < count1; i++ { + for j := 0; j < count2; j++ { + if i+j > 0 { + buf.WriteString(", ") + } + fmt.Fprintf(&buf, "(%d, %d)", i, j) + } + } + if _, err := db.Exec(buf.String()); err != nil { + b.Fatal(err) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + query := fmt.Sprintf(`SELECT * FROM bench.scan2 WHERE %s`, filter) + if limit != 0 { + query += fmt.Sprintf(` LIMIT %d`, limit) + } + rows, err := db.Query(query) + if err != nil { + b.Fatal(err) + } + n := 0 + for rows.Next() { + n++ + } + rows.Close() + if err := rows.Err(); err != nil { + b.Fatal(err) + } + } + b.StopTimer() + + if _, err := db.Exec(`DROP TABLE bench.scan2`); err != nil { + b.Fatal(err) + } +} + +func BenchmarkScan10000FilterLimit1_Cockroach(b *testing.B) { + benchmarkCockroach(b, func(b *testing.B, db *sql.DB) { + runBenchmarkScanFilter(b, db, 10, 1000, 1, `a IN (1, 3, 5, 7) AND b < 10*a`) + }) +} + +func BenchmarkScan10000FilterLimit1_Postgres(b *testing.B) { + benchmarkPostgres(b, func(b *testing.B, db *sql.DB) { + runBenchmarkScanFilter(b, db, 10, 1000, 1, `a IN (1, 3, 5, 7) AND b < 10*a`) + }) +} + +func BenchmarkScan10000FilterLimit10_Cockroach(b *testing.B) { + benchmarkCockroach(b, func(b *testing.B, db *sql.DB) { + runBenchmarkScanFilter(b, db, 10, 1000, 10, `a IN (1, 3, 5, 7) AND b < 10*a`) + }) +} + +func BenchmarkScan10000FilterLimit10_Postgres(b *testing.B) { + benchmarkPostgres(b, func(b *testing.B, db *sql.DB) { + runBenchmarkScanFilter(b, db, 10, 1000, 10, `a IN (1, 3, 5, 7) AND b < 10*a`) + }) +} + +func BenchmarkScan10000FilterLimit50_Cockroach(b *testing.B) { + benchmarkCockroach(b, func(b *testing.B, db *sql.DB) { + runBenchmarkScanFilter(b, db, 10, 1000, 50, `a IN (1, 3, 5, 7) AND b < 10*a`) + }) +} + +func BenchmarkScan10000FilterLimit50_Postgres(b *testing.B) { + benchmarkPostgres(b, func(b *testing.B, db *sql.DB) { + runBenchmarkScanFilter(b, db, 10, 1000, 50, `a IN (1, 3, 5, 7) AND b < 10*a`) + }) +} From 22e9b7d964379300fb831c15eef3fcd60fbea3f1 Mon Sep 17 00:00:00 2001 From: Radu Berinde Date: Sun, 13 Mar 2016 13:00:23 -0400 Subject: [PATCH 2/2] sql: use new batch limit to limit all scans MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit We change the kvFetcher code to use the new per-batch limit instead of limiting only single-span scans. The scan_test code is modified to work with multiple spans (previous code assumed OR branches would result in spans but that's not the case) ``` name old time/op new time/op delta Scan10000FilterLimit1_Cockroach-4 10.1ms ±51% 0.3ms ±23% -97.43% (p=0.000 n=10+9) Scan10000FilterLimit10_Cockroach-4 9.07ms ±17% 0.27ms ± 2% -97.05% (p=0.000 n=9+9) Scan10000FilterLimit50_Cockroach-4 10.5ms ± 8% 10.8ms ± 9% ~ (p=0.211 n=10+9) ``` --- sql/kvfetcher.go | 108 ++++++++++++++++++++++++---------------------- sql/scan_test.go | 109 ++++++++++++++++++++++++++--------------------- 2 files changed, 119 insertions(+), 98 deletions(-) diff --git a/sql/kvfetcher.go b/sql/kvfetcher.go index 6332f86bf211..6002d6d000cc 100644 --- a/sql/kvfetcher.go +++ b/sql/kvfetcher.go @@ -128,63 +128,72 @@ func makeKVFetcher(txn *client.Txn, spans spans, reverse bool, firstBatchLimit i // fetch retrieves spans from the kv func (f *kvFetcher) fetch() *roachpb.Error { // Retrieve all the spans. - b := &client.Batch{} - - // TODO(radu): until we have a per-batch limit (issue #4696), we - // only do batching if we have a single span. - if len(f.spans) == 1 { - count := int64(kvBatchSize) - if f.firstBatchLimit != 0 && f.firstBatchLimit < count && len(f.kvs) == 0 { - count = f.firstBatchLimit - } - if f.spans[0].count != 0 { - if f.spans[0].count <= f.totalFetched { - panic(fmt.Sprintf("trying to fetch beyond span count %d (fetched: %d)", - f.spans[0].count, f.totalFetched)) - } - remaining := f.spans[0].count - f.totalFetched - if count > remaining { - count = remaining - } + batchSize := int64(kvBatchSize) + if f.firstBatchLimit != 0 && len(f.kvs) == 0 && f.firstBatchLimit < batchSize { + batchSize = f.firstBatchLimit + } + + b := &client.Batch{MaxScanResults: batchSize} + + var resumeKey roachpb.Key + if len(f.kvs) > 0 { + resumeKey = f.kvs[len(f.kvs)-1].Key + // To resume forward scans we will set the (inclusive) scan start to the Next of the last + // received key. To resume reverse scans we will set the (exclusive) scan end to the last + // received key. + if !f.reverse { + resumeKey = resumeKey.ShallowNext() } - if f.reverse { - end := f.spans[0].end - if len(f.kvs) > 0 { - // the new range ends at the last key (non-inclusive) - end = f.kvs[len(f.kvs)-1].Key - if end.Equal(f.spans[0].start) { - // No more keys - f.kvs = nil - f.fetchEnd = true - return nil + } + + atEnd := true + if !f.reverse { + for i := 0; i < len(f.spans); i++ { + start := f.spans[i].start + if resumeKey != nil { + if resumeKey.Compare(f.spans[i].end) >= 0 { + // We are resuming from a key after this span. + continue } - } - b.ReverseScan(f.spans[0].start, end, count) - } else { - start := f.spans[0].start - if len(f.kvs) > 0 { - // the new range starts after the last key - start = f.kvs[len(f.kvs)-1].Key.ShallowNext() - if start.Equal(f.spans[0].end) { - // No more keys - f.kvs = nil - f.fetchEnd = true - return nil + if resumeKey.Compare(start) > 0 { + // We are resuming from a key inside this span. + // In this case we should technically reduce the max count for the span; but + // since this count is only an optimization it's not incorrect to retrieve more + // keys for the span. + start = resumeKey } } - b.Scan(start, f.spans[0].end, count) + atEnd = false + b.Scan(start, f.spans[i].end, f.spans[i].count) } } else { - if f.reverse { - for i := len(f.spans) - 1; i >= 0; i-- { - b.ReverseScan(f.spans[i].start, f.spans[i].end, f.spans[i].count) - } - } else { - for i := 0; i < len(f.spans); i++ { - b.Scan(f.spans[i].start, f.spans[i].end, f.spans[i].count) + for i := len(f.spans) - 1; i >= 0; i-- { + end := f.spans[i].end + if resumeKey != nil { + if resumeKey.Compare(f.spans[i].start) <= 0 { + // We are resuming from a key before this span. + continue + } + if resumeKey.Compare(end) < 0 { + // We are resuming from a key inside this span. + // In this case we should technically reduce the max count for the span; but + // since this count is only an optimization it's not incorrect to retrieve more + // keys for the span. + end = resumeKey + } } + atEnd = false + b.ReverseScan(f.spans[i].start, end, f.spans[i].count) } } + + if atEnd { + // The last scan happened to finish just at the end of the last span. + f.kvs = nil + f.fetchEnd = true + return nil + } + if pErr := f.txn.Run(b); pErr != nil { return pErr } @@ -206,8 +215,7 @@ func (f *kvFetcher) fetch() *roachpb.Error { f.totalFetched += int64(len(f.kvs)) f.kvIndex = 0 - if !(len(f.spans) == 1 && len(f.kvs) == kvBatchSize && - (f.spans[0].count == 0 || f.spans[0].count > f.totalFetched)) { + if int64(len(f.kvs)) < batchSize { f.fetchEnd = true } diff --git a/sql/scan_test.go b/sql/scan_test.go index 99f8017deae6..9b862e9797da 100644 --- a/sql/scan_test.go +++ b/sql/scan_test.go @@ -31,8 +31,8 @@ import ( "github.com/cockroachdb/cockroach/util/leaktest" ) -// genRanges generates ordered, non-overlaping ranges with values in [0 and valRange). -func genRanges(num int, valRange int) [][2]int { +// genAs returns num random distinct ordered values in [0, valRange). +func genValues(num, valRange int) []int { // Generate num _distinct_ values. We do this by generating a partial permutation. perm := make([]int, valRange) for i := 0; i < valRange; i++ { @@ -46,42 +46,51 @@ func genRanges(num int, valRange int) [][2]int { perm = perm[:num] // Sort the values. These distinct values will be the starts of our ranges. sort.Ints(perm) - res := make([][2]int, num) - for i := 0; i < num; i++ { - res[i][0] = perm[i] - next := valRange - if i < num-1 { - next = perm[i+1] + return perm +} + +// testScanBatchQuery runs a query of the form +// SELECT a,B FROM test.scan WHERE a IN (1,5,3..) AND b >= 5 AND b <= 10 +// numSpans controls the number of possible values for a. +func testScanBatchQuery(t *testing.T, db *sql.DB, numSpans, numAs, numBs int, reverse bool) { + // Generate numSpans values for A + aVals := genValues(numSpans, numAs) + + // Generate a random range for B + bStart := rand.Int() % numBs + bEnd := bStart + rand.Int()%(numBs-bStart) + + var expected [][2]int + for _, a := range aVals { + for b := bStart; b <= bEnd; b++ { + expected = append(expected, [2]int{a, b}) + } + } + + if len(aVals) == 0 { + // No filter on a. + for a := 0; a < numAs; a++ { + for b := bStart; b <= bEnd; b++ { + expected = append(expected, [2]int{a, b}) + } } - // Pick a random end in the range [perm[i], next). - res[i][1] = perm[i] + rand.Int()%(next-perm[i]) } - return res -} -func testScanBatchQuery(t *testing.T, db *sql.DB, numRanges int, numRows int, reverse bool) { - ranges := genRanges(numRanges, numRows) - expected := []int(nil) var buf bytes.Buffer - buf.WriteString(`SELECT k FROM test.scan`) - for i, r := range ranges { + buf.WriteString(fmt.Sprintf("SELECT a,b FROM test.scan WHERE b >= %d AND b <= %d", bStart, bEnd)) + for i, a := range aVals { if i == 0 { - buf.WriteString(" WHERE ") + buf.WriteString(fmt.Sprintf(" AND a IN (%d", a)) } else { - buf.WriteString(" OR ") - } - buf.WriteString(fmt.Sprintf(`(k >= %d AND k <= %d)`, r[0], r[1])) - for j := r[0]; j <= r[1]; j++ { - expected = append(expected, j) + buf.WriteString(fmt.Sprintf(",%d", a)) } } - if len(ranges) == 0 { - for j := 0; j < numRows; j++ { - expected = append(expected, j) - } + if len(aVals) > 0 { + buf.WriteString(")") } + if reverse { - buf.WriteString(" ORDER BY k DESC") + buf.WriteString(" ORDER BY a DESC, b DESC") for i, j := 0, len(expected)-1; i < j; i, j = i+1, j-1 { expected[i], expected[j] = expected[j], expected[i] } @@ -95,13 +104,14 @@ func testScanBatchQuery(t *testing.T, db *sql.DB, numRanges int, numRows int, re if n >= len(expected) { t.Fatalf("too many rows (expected %d)", len(expected)) } - var val int - err = rows.Scan(&val) + var a, b int + err = rows.Scan(&a, &b) if err != nil { t.Fatal(err) } - if val != expected[n] { - t.Errorf("row %d: invalid value %d (expected %d)", n, val, expected[n]) + if a != expected[n][0] || b != expected[n][1] { + t.Errorf("row %d: invalid values %d,%d (expected %d,%d)", + n, a, b, expected[n][0], expected[n][1]) } n++ } @@ -140,26 +150,29 @@ func TestScanBatches(t *testing.T) { restore := csql.SetKVBatchSize(10) defer restore() - numRows := 100 + numAs := 5 + numBs := 20 if _, err := db.Exec(`DROP TABLE IF EXISTS test.scan`); err != nil { t.Fatal(err) } - if _, err := db.Exec(`CREATE TABLE test.scan (k INT PRIMARY KEY, v STRING)`); err != nil { + if _, err := db.Exec(`CREATE TABLE test.scan (a INT, b INT, v STRING, PRIMARY KEY (a, b))`); err != nil { t.Fatal(err) } var buf bytes.Buffer buf.WriteString(`INSERT INTO test.scan VALUES `) - for i := 0; i < numRows; i++ { - if i > 0 { - buf.WriteString(", ") - } - if i%2 == 0 { - fmt.Fprintf(&buf, "(%d, 'str%d')", i, i) - } else { - // Every other row doesn't get the string value (to have NULLs). - fmt.Fprintf(&buf, "(%d, NULL)", i) + for a := 0; a < numAs; a++ { + for b := 0; b < numBs; b++ { + if a+b > 0 { + buf.WriteString(", ") + } + if (a+b)%2 == 0 { + fmt.Fprintf(&buf, "(%d, %d, 'str%d%d')", a, b, a, b) + } else { + // Every other row doesn't get the string value (to have NULLs). + fmt.Fprintf(&buf, "(%d, %d, NULL)", a, b) + } } } if _, err := db.Exec(buf.String()); err != nil { @@ -167,15 +180,15 @@ func TestScanBatches(t *testing.T) { } // The table will have one key for the even rows, and two keys for the odd rows. - batchSizes := []int{1, 2, 3, 5, 10, 13, 100, 3*numRows/2 - 1, 3 * numRows / 2, 3*numRows/2 + 1} - // We can test with at most one one span for now (see kvFetcher.fetch) - numSpanValues := []int{0, 1} + numKeys := 3 * numAs * numBs / 2 + batchSizes := []int{1, 2, 3, 5, 10, 13, 100, numKeys - 1, numKeys, numKeys + 1} + numSpanValues := []int{0, 1, 2, 3} for _, batch := range batchSizes { csql.SetKVBatchSize(batch) for _, numSpans := range numSpanValues { - testScanBatchQuery(t, db, numSpans, numRows, false) - testScanBatchQuery(t, db, numSpans, numRows, true) + testScanBatchQuery(t, db, numSpans, numAs, numBs, false) + testScanBatchQuery(t, db, numSpans, numAs, numBs, true) } }