Skip to content

Commit

Permalink
Implement efficient range-scans
Browse files Browse the repository at this point in the history
Allow for the token-space to be split up into a configurable amount of
sub-ranges and concurrently scan a configurable subset of them. The
algorithm used is that described in Avi's efficent range scan blog post
[1].
The number of sub-ranges to split the token-space into can be set
by the `-range-count` command line paramaters. This defaults to 1, in
which case the table is scanned in a single query.
The concurrency can be set with the `-concurrency` command line
argument. Each sub-range will be an `op`.

[1] https://www.scylladb.com/2017/02/13/efficient-full-table-scans-with-scylla-1-6/
  • Loading branch information
denesb authored and mmatczuk committed Aug 24, 2018
1 parent 0a5e6a9 commit 1d02201
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 11 deletions.
17 changes: 12 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,20 @@ Counter read mode works in exactly the same as regular read mode (with the same

#### Scan mode (`-mode scan`)

Scan the entire table. This mode does not allow the `workload` to be configured. It is important to note that range-scans put a significant load on the cluster and also take a long time to complete.
Thus it is advisable to pass a significantly larger timeout (in the minutes range) and low concurrency.
Scan the entire table. This mode does not allow the `workload` to be configured (it has its own workload called `scan`). The scan mode allows for the token-space to be split into a user configurable sub-ranges and for querying these sub-ranges concurrently. The algorithm used is that descibed by [Avi's efficient range scans blog post](https://www.scylladb.com/2017/02/13/efficient-full-table-scans-with-scylla-1-6/).
The amount of sub-ranges that the token-space will be split into can be set by the `-range-count` flag. The recommended number to set this to is:

-range-count = (nodes in cluster) ✕ (cores in node) ✕ 300

The number of sub-ranges to be read concurrency can be set by the `-concurrency` flag as usual. The recommended concurrency is:

-concurrency = range-count/100

For more details on these numbers see the above mentioned blog post.

Essentially the following query is executed:

```
SELECT * FROM scylla_bench.test
```
SELECT * FROM scylla_bench.test WHERE token(pk) >= ? AND token(pk) <= ?

### Workloads

Expand Down
17 changes: 16 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ var (
inRestriction bool
noLowerBound bool

rangeCount int

timeout time.Duration

startTime time.Time
Expand Down Expand Up @@ -97,7 +99,15 @@ func GetWorkload(name string, threadId int, partitionOffset int64, mode string,
log.Fatal("time series workload supports only write and read modes")
}
case "scan":
return &RangeScan{}
rangesPerThread := rangeCount / concurrency
thisOffset := rangesPerThread * threadId
var thisCount int
if threadId+1 == concurrency {
thisCount = rangeCount - thisOffset
} else {
thisCount = rangesPerThread
}
return NewRangeScan(rangeCount, thisOffset, thisCount)
default:
log.Fatal("unknown workload: ", name)
}
Expand Down Expand Up @@ -189,6 +199,7 @@ func main() {
flag.BoolVar(&provideUpperBound, "provide-upper-bound", false, "whether read requests should provide an upper bound")
flag.BoolVar(&inRestriction, "in-restriction", false, "use IN restriction in read requests")
flag.BoolVar(&noLowerBound, "no-lower-bound", false, "do not provide lower bound in read requests")
flag.IntVar(&rangeCount, "range-count", 1, "number of ranges to split the token space into (relevant only for scan mode)")

flag.DurationVar(&testDuration, "duration", 0, "duration of the test in seconds (0 for unlimited)")

Expand Down Expand Up @@ -223,6 +234,10 @@ func main() {
log.Fatal("workload type cannot be scpecified for scan mode")
}
workload = "scan"
if concurrency > rangeCount {
concurrency = rangeCount
log.Printf("adjusting concurrency to the highest useful value of %v", concurrency)
}
} else {
if workload == "" {
log.Fatal("workload type needs to be specified")
Expand Down
6 changes: 4 additions & 2 deletions modes.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,12 +456,14 @@ func DoReadsFromTable(table string, session *gocql.Session, resultChannel chan R
}

func DoScanTable(session *gocql.Session, resultChannel chan Result, workload WorkloadGenerator, rateLimiter RateLimiter) {
request := fmt.Sprintf("SELECT * FROM %s.%s", keyspaceName, tableName)
request := fmt.Sprintf("SELECT * FROM %s.%s WHERE token(pk) >= ? AND token(pk) <= ?", keyspaceName, tableName)
query := session.Query(request)

RunTest(resultChannel, workload, rateLimiter, func(rb *ResultBuilder) (error, time.Duration) {
requestStart := time.Now()
iter := query.Iter()
currentRange := workload.NextTokenRange()
bound := query.Bind(currentRange.Start, currentRange.End)
iter := bound.Iter()
for iter.Scan(nil, nil, nil) {
rb.IncRows()
}
Expand Down
73 changes: 70 additions & 3 deletions workloads.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,19 @@ func MinInt64(a int64, b int64) int64 {
}
}

const (
minToken int64 = -(1 << 63)
maxToken int64 = (1 << 63) - 1
)

// Bounds are inclusive
type TokenRange struct {
Start int64
End int64
}

type WorkloadGenerator interface {
NextTokenRange() TokenRange
NextPartitionKey() int64
NextClusteringKey() int64
IsPartitionDone() bool
Expand All @@ -33,6 +45,10 @@ func NewSequentialVisitAll(partitionOffset int64, partitionCount int64, clusteri
return &SequentialVisitAll{partitionOffset + partitionCount, clusteringRowCount, partitionOffset, 0}
}

func (sva *SequentialVisitAll) NextTokenRange() TokenRange {
panic("SequentialVisitAll does not support NextTokenRange()")
}

func (sva *SequentialVisitAll) NextPartitionKey() int64 {
if sva.NextClusteringRow < sva.ClusteringRowCount {
return sva.NextPartition
Expand Down Expand Up @@ -68,6 +84,10 @@ func NewRandomUniform(i int, partitionCount int64, clusteringRowCount int64) *Ra
return &RandomUniform{generator, int64(partitionCount), int64(clusteringRowCount)}
}

func (ru *RandomUniform) NextTokenRange() TokenRange {
panic("RandomUniform does not support NextTokenRange()")
}

func (ru *RandomUniform) NextPartitionKey() int64 {
return ru.Generator.Int63n(ru.PartitionCount)
}
Expand Down Expand Up @@ -105,6 +125,10 @@ func NewTimeSeriesWriter(threadId int, threadCount int, pkCount int64, ckCount i
ckCount, 0, startTime, period, false}
}

func (tsw *TimeSeriesWrite) NextTokenRange() TokenRange {
panic("TimeSeriesWrite does not support NextTokenRange()")
}

func (tsw *TimeSeriesWrite) NextPartitionKey() int64 {
tsw.PkPosition += tsw.PkStride
if tsw.PkPosition >= tsw.PkCount {
Expand Down Expand Up @@ -173,6 +197,10 @@ func RandomInt64(generator *rand.Rand, halfNormalDist bool, maxValue int64) int6
}
}

func (tsw *TimeSeriesRead) NextTokenRange() TokenRange {
panic("TimeSeriesRead does not support NextTokenRange()")
}

func (tsw *TimeSeriesRead) NextPartitionKey() int64 {
tsw.PkPosition += tsw.PkStride
if tsw.PkPosition >= tsw.PkCount {
Expand All @@ -198,8 +226,47 @@ func (tsw *TimeSeriesRead) IsPartitionDone() bool {
return false
}

// Dummy workload generator for range scans
type RangeScan struct {
TotalRangeCount int
RangeOffset int
RangeCount int
NextRange int
}

func NewRangeScan(totalRangeCount int, rangeOffset int, rangeCount int) *RangeScan {
return &RangeScan{totalRangeCount, rangeOffset, rangeOffset + rangeCount, rangeOffset}
}

func (rs* RangeScan) NextTokenRange() TokenRange {
// Special case, no range splitting
if rs.TotalRangeCount == 1 {
rs.NextRange++;
return TokenRange{minToken, maxToken}
}

// This is in fact -1 compared to the real number of tokens, which
// is 2**64. But this is fine, as the worst that can happen is that
// due to the inprecise calculation of tokensPerRange more tokens
// will be in the very last range than should be, which is
// tolerable.
const tokenCount uint64 = ^uint64(0)
// Due to the special handling of TotalRangeCount == 1 above, this
// is guaranteed to safely fit into an int64
tokensPerRange := int64(tokenCount / uint64(rs.TotalRangeCount))

currentRange := rs.NextRange
rs.NextRange++;

firstToken := minToken + int64(currentRange) * tokensPerRange
var lastToken int64
// Make sure the very last range streches all the way to maxToken.
if rs.NextRange == rs.TotalRangeCount {
lastToken = maxToken
} else {
lastToken = firstToken + tokensPerRange - 1
}

return TokenRange{firstToken, lastToken}
}

func (*RangeScan) NextPartitionKey() int64 {
Expand All @@ -214,6 +281,6 @@ func (*RangeScan) IsPartitionDone() bool {
return false
}

func (*RangeScan) IsDone() bool {
return false
func (rs *RangeScan) IsDone() bool {
return rs.NextRange >= rs.RangeCount
}

0 comments on commit 1d02201

Please sign in to comment.