From 1d02201c55073ba75ebc9fc492b136008893aa23 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Fri, 6 Jul 2018 12:46:01 +0300 Subject: [PATCH] Implement efficient range-scans Allow for the token-space to be split up into a configurable amount of sub-ranges and concurrently scan a configurable subset of them. The algorithm used is that described in Avi's efficent range scan blog post [1]. The number of sub-ranges to split the token-space into can be set by the `-range-count` command line paramaters. This defaults to 1, in which case the table is scanned in a single query. The concurrency can be set with the `-concurrency` command line argument. Each sub-range will be an `op`. [1] https://www.scylladb.com/2017/02/13/efficient-full-table-scans-with-scylla-1-6/ --- README.md | 17 ++++++++---- main.go | 17 +++++++++++- modes.go | 6 +++-- workloads.go | 73 +++++++++++++++++++++++++++++++++++++++++++++++++--- 4 files changed, 102 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index 0c6f472..00d67ee 100644 --- a/README.md +++ b/README.md @@ -93,13 +93,20 @@ Counter read mode works in exactly the same as regular read mode (with the same #### Scan mode (`-mode scan`) -Scan the entire table. This mode does not allow the `workload` to be configured. It is important to note that range-scans put a significant load on the cluster and also take a long time to complete. -Thus it is advisable to pass a significantly larger timeout (in the minutes range) and low concurrency. +Scan the entire table. This mode does not allow the `workload` to be configured (it has its own workload called `scan`). The scan mode allows for the token-space to be split into a user configurable sub-ranges and for querying these sub-ranges concurrently. The algorithm used is that descibed by [Avi's efficient range scans blog post](https://www.scylladb.com/2017/02/13/efficient-full-table-scans-with-scylla-1-6/). +The amount of sub-ranges that the token-space will be split into can be set by the `-range-count` flag. The recommended number to set this to is: + + -range-count = (nodes in cluster) ✕ (cores in node) ✕ 300 + +The number of sub-ranges to be read concurrency can be set by the `-concurrency` flag as usual. The recommended concurrency is: + + -concurrency = range-count/100 + +For more details on these numbers see the above mentioned blog post. + Essentially the following query is executed: -``` -SELECT * FROM scylla_bench.test -``` + SELECT * FROM scylla_bench.test WHERE token(pk) >= ? AND token(pk) <= ? ### Workloads diff --git a/main.go b/main.go index 2c19027..71b3fc4 100644 --- a/main.go +++ b/main.go @@ -34,6 +34,8 @@ var ( inRestriction bool noLowerBound bool + rangeCount int + timeout time.Duration startTime time.Time @@ -97,7 +99,15 @@ func GetWorkload(name string, threadId int, partitionOffset int64, mode string, log.Fatal("time series workload supports only write and read modes") } case "scan": - return &RangeScan{} + rangesPerThread := rangeCount / concurrency + thisOffset := rangesPerThread * threadId + var thisCount int + if threadId+1 == concurrency { + thisCount = rangeCount - thisOffset + } else { + thisCount = rangesPerThread + } + return NewRangeScan(rangeCount, thisOffset, thisCount) default: log.Fatal("unknown workload: ", name) } @@ -189,6 +199,7 @@ func main() { flag.BoolVar(&provideUpperBound, "provide-upper-bound", false, "whether read requests should provide an upper bound") flag.BoolVar(&inRestriction, "in-restriction", false, "use IN restriction in read requests") flag.BoolVar(&noLowerBound, "no-lower-bound", false, "do not provide lower bound in read requests") + flag.IntVar(&rangeCount, "range-count", 1, "number of ranges to split the token space into (relevant only for scan mode)") flag.DurationVar(&testDuration, "duration", 0, "duration of the test in seconds (0 for unlimited)") @@ -223,6 +234,10 @@ func main() { log.Fatal("workload type cannot be scpecified for scan mode") } workload = "scan" + if concurrency > rangeCount { + concurrency = rangeCount + log.Printf("adjusting concurrency to the highest useful value of %v", concurrency) + } } else { if workload == "" { log.Fatal("workload type needs to be specified") diff --git a/modes.go b/modes.go index 2b250c2..e019546 100644 --- a/modes.go +++ b/modes.go @@ -456,12 +456,14 @@ func DoReadsFromTable(table string, session *gocql.Session, resultChannel chan R } func DoScanTable(session *gocql.Session, resultChannel chan Result, workload WorkloadGenerator, rateLimiter RateLimiter) { - request := fmt.Sprintf("SELECT * FROM %s.%s", keyspaceName, tableName) + request := fmt.Sprintf("SELECT * FROM %s.%s WHERE token(pk) >= ? AND token(pk) <= ?", keyspaceName, tableName) query := session.Query(request) RunTest(resultChannel, workload, rateLimiter, func(rb *ResultBuilder) (error, time.Duration) { requestStart := time.Now() - iter := query.Iter() + currentRange := workload.NextTokenRange() + bound := query.Bind(currentRange.Start, currentRange.End) + iter := bound.Iter() for iter.Scan(nil, nil, nil) { rb.IncRows() } diff --git a/workloads.go b/workloads.go index bc21b43..19f264b 100644 --- a/workloads.go +++ b/workloads.go @@ -15,7 +15,19 @@ func MinInt64(a int64, b int64) int64 { } } +const ( + minToken int64 = -(1 << 63) + maxToken int64 = (1 << 63) - 1 +) + +// Bounds are inclusive +type TokenRange struct { + Start int64 + End int64 +} + type WorkloadGenerator interface { + NextTokenRange() TokenRange NextPartitionKey() int64 NextClusteringKey() int64 IsPartitionDone() bool @@ -33,6 +45,10 @@ func NewSequentialVisitAll(partitionOffset int64, partitionCount int64, clusteri return &SequentialVisitAll{partitionOffset + partitionCount, clusteringRowCount, partitionOffset, 0} } +func (sva *SequentialVisitAll) NextTokenRange() TokenRange { + panic("SequentialVisitAll does not support NextTokenRange()") +} + func (sva *SequentialVisitAll) NextPartitionKey() int64 { if sva.NextClusteringRow < sva.ClusteringRowCount { return sva.NextPartition @@ -68,6 +84,10 @@ func NewRandomUniform(i int, partitionCount int64, clusteringRowCount int64) *Ra return &RandomUniform{generator, int64(partitionCount), int64(clusteringRowCount)} } +func (ru *RandomUniform) NextTokenRange() TokenRange { + panic("RandomUniform does not support NextTokenRange()") +} + func (ru *RandomUniform) NextPartitionKey() int64 { return ru.Generator.Int63n(ru.PartitionCount) } @@ -105,6 +125,10 @@ func NewTimeSeriesWriter(threadId int, threadCount int, pkCount int64, ckCount i ckCount, 0, startTime, period, false} } +func (tsw *TimeSeriesWrite) NextTokenRange() TokenRange { + panic("TimeSeriesWrite does not support NextTokenRange()") +} + func (tsw *TimeSeriesWrite) NextPartitionKey() int64 { tsw.PkPosition += tsw.PkStride if tsw.PkPosition >= tsw.PkCount { @@ -173,6 +197,10 @@ func RandomInt64(generator *rand.Rand, halfNormalDist bool, maxValue int64) int6 } } +func (tsw *TimeSeriesRead) NextTokenRange() TokenRange { + panic("TimeSeriesRead does not support NextTokenRange()") +} + func (tsw *TimeSeriesRead) NextPartitionKey() int64 { tsw.PkPosition += tsw.PkStride if tsw.PkPosition >= tsw.PkCount { @@ -198,8 +226,47 @@ func (tsw *TimeSeriesRead) IsPartitionDone() bool { return false } -// Dummy workload generator for range scans type RangeScan struct { + TotalRangeCount int + RangeOffset int + RangeCount int + NextRange int +} + +func NewRangeScan(totalRangeCount int, rangeOffset int, rangeCount int) *RangeScan { + return &RangeScan{totalRangeCount, rangeOffset, rangeOffset + rangeCount, rangeOffset} +} + +func (rs* RangeScan) NextTokenRange() TokenRange { + // Special case, no range splitting + if rs.TotalRangeCount == 1 { + rs.NextRange++; + return TokenRange{minToken, maxToken} + } + + // This is in fact -1 compared to the real number of tokens, which + // is 2**64. But this is fine, as the worst that can happen is that + // due to the inprecise calculation of tokensPerRange more tokens + // will be in the very last range than should be, which is + // tolerable. + const tokenCount uint64 = ^uint64(0) + // Due to the special handling of TotalRangeCount == 1 above, this + // is guaranteed to safely fit into an int64 + tokensPerRange := int64(tokenCount / uint64(rs.TotalRangeCount)) + + currentRange := rs.NextRange + rs.NextRange++; + + firstToken := minToken + int64(currentRange) * tokensPerRange + var lastToken int64 + // Make sure the very last range streches all the way to maxToken. + if rs.NextRange == rs.TotalRangeCount { + lastToken = maxToken + } else { + lastToken = firstToken + tokensPerRange - 1 + } + + return TokenRange{firstToken, lastToken} } func (*RangeScan) NextPartitionKey() int64 { @@ -214,6 +281,6 @@ func (*RangeScan) IsPartitionDone() bool { return false } -func (*RangeScan) IsDone() bool { - return false +func (rs *RangeScan) IsDone() bool { + return rs.NextRange >= rs.RangeCount }