Skip to content

Commit

Permalink
run multiple queries per table at once with boltdb-shipper (#2656)
Browse files Browse the repository at this point in the history
  • Loading branch information
sandeepsukhani authored Sep 25, 2020
1 parent 89b8ae4 commit 79c0275
Show file tree
Hide file tree
Showing 10 changed files with 258 additions and 34 deletions.
18 changes: 13 additions & 5 deletions pkg/storage/stores/shipper/downloads/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,8 @@ func (t *Table) Close() {
t.dbs = map[string]*downloadedFile{}
}

// Queries all the dbs for index.
func (t *Table) Query(ctx context.Context, query chunk.IndexQuery, callback chunk_util.Callback) error {
// MultiQueries runs multiple queries without having to take lock multiple times for each query.
func (t *Table) MultiQueries(ctx context.Context, queries []chunk.IndexQuery, callback chunk_util.Callback) error {
// let us check if table is ready for use while also honoring the context timeout
select {
case <-ctx.Done():
Expand All @@ -216,10 +216,18 @@ func (t *Table) Query(ctx context.Context, query chunk.IndexQuery, callback chun

t.lastUsedAt = time.Now()

for _, db := range t.dbs {
if err := t.boltDBIndexClient.QueryDB(ctx, db.boltdb, query, callback); err != nil {
return err
log, ctx := spanlogger.New(ctx, "Shipper.Downloads.Table.MultiQueries")
defer log.Span.Finish()

level.Debug(log).Log("table-name", t.name, "query-count", len(queries))

for name, db := range t.dbs {
for _, query := range queries {
if err := t.boltDBIndexClient.QueryDB(ctx, db.boltdb, query, callback); err != nil {
return err
}
}
level.Debug(log).Log("queried-db", name)
}

return nil
Expand Down
24 changes: 17 additions & 7 deletions pkg/storage/stores/shipper/downloads/table_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"github.com/cortexproject/cortex/pkg/util/spanlogger"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"

"github.com/grafana/loki/pkg/storage/stores/shipper/util"
)

const cacheCleanupInterval = time.Hour
Expand Down Expand Up @@ -102,27 +104,35 @@ func (tm *TableManager) Stop() {
}

func (tm *TableManager) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback chunk_util.Callback) error {
return chunk_util.DoParallelQueries(ctx, tm.query, queries, callback)
queriesByTable := util.QueriesByTable(queries)
for tableName, queries := range queriesByTable {
err := tm.query(ctx, tableName, queries, callback)
if err != nil {
return err
}
}

return nil
}

func (tm *TableManager) query(ctx context.Context, query chunk.IndexQuery, callback chunk_util.Callback) error {
func (tm *TableManager) query(ctx context.Context, tableName string, queries []chunk.IndexQuery, callback chunk_util.Callback) error {
log, ctx := spanlogger.New(ctx, "Shipper.Downloads.Query")
defer log.Span.Finish()

level.Debug(log).Log("table-name", query.TableName)
level.Debug(log).Log("table-name", tableName)

table := tm.getOrCreateTable(ctx, query.TableName)
table := tm.getOrCreateTable(ctx, tableName)

err := table.Query(ctx, query, callback)
err := util.DoParallelQueries(ctx, table, queries, callback)
if err != nil {
if table.Err() != nil {
// table is in invalid state, remove the table so that next queries re-create it.
tm.tablesMtx.Lock()
defer tm.tablesMtx.Unlock()

level.Error(pkg_util.Logger).Log("msg", fmt.Sprintf("table %s has some problem, cleaning it up", query.TableName), "err", table.Err())
level.Error(pkg_util.Logger).Log("msg", fmt.Sprintf("table %s has some problem, cleaning it up", tableName), "err", table.Err())

delete(tm.tables, query.TableName)
delete(tm.tables, tableName)
return table.Err()
}
}
Expand Down
20 changes: 14 additions & 6 deletions pkg/storage/stores/shipper/downloads/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io/ioutil"
"os"
"path/filepath"
"strconv"
"testing"
"time"

Expand Down Expand Up @@ -58,7 +59,7 @@ func buildTestTable(t *testing.T, tableName, path string) (*Table, *local.BoltIn
}
}

func TestTable_Query(t *testing.T) {
func TestTable_MultiQueries(t *testing.T) {
tempDir, err := ioutil.TempDir("", "table-writes")
require.NoError(t, err)

Expand Down Expand Up @@ -90,7 +91,14 @@ func TestTable_Query(t *testing.T) {
stopFunc()
}()

testutil.TestSingleQuery(t, chunk.IndexQuery{}, table, 0, 30)
// build queries each looking for specific value from all the dbs
var queries []chunk.IndexQuery
for i := 5; i < 25; i++ {
queries = append(queries, chunk.IndexQuery{ValueEqual: []byte(strconv.Itoa(i))})
}

// query the loaded table to see if it has right data.
testutil.TestSingleTableQuery(t, queries, table, 5, 20)
}

func TestTable_Sync(t *testing.T) {
Expand Down Expand Up @@ -136,7 +144,7 @@ func TestTable_Sync(t *testing.T) {
}()

// query table to see it has expected records setup
testutil.TestSingleQuery(t, chunk.IndexQuery{}, table, 0, 30)
testutil.TestSingleTableQuery(t, []chunk.IndexQuery{{}}, table, 0, 30)

// add a sleep since we are updating a file and CI is sometimes too fast to create a difference in mtime of files
time.Sleep(time.Second)
Expand All @@ -150,7 +158,7 @@ func TestTable_Sync(t *testing.T) {
require.NoError(t, table.Sync(context.Background()))

// query and verify table has expected records from new and updated db and the records from deleted db are gone
testutil.TestSingleQuery(t, chunk.IndexQuery{}, table, 10, 40)
testutil.TestSingleTableQuery(t, []chunk.IndexQuery{{}}, table, 10, 40)

// verify files in cache where dbs for the table are synced to double check.
expectedFilesInDir := map[string]struct{}{
Expand Down Expand Up @@ -187,7 +195,7 @@ func TestTable_LastUsedAt(t *testing.T) {
require.InDelta(t, time.Now().Add(-time.Hour).Unix(), table.LastUsedAt().Unix(), 1)

// query the table which should set the last used at to now.
err = table.Query(context.Background(), chunk.IndexQuery{}, func(query chunk.IndexQuery, batch chunk.ReadBatch) bool {
err = table.MultiQueries(context.Background(), []chunk.IndexQuery{{}}, func(query chunk.IndexQuery, batch chunk.ReadBatch) bool {
return true
})
require.NoError(t, err)
Expand Down Expand Up @@ -226,7 +234,7 @@ func TestTable_doParallelDownload(t *testing.T) {

// ensure that we have `tc` number of files downloaded and opened.
require.Len(t, table.dbs, tc)
testutil.TestSingleQuery(t, chunk.IndexQuery{}, table, 0, tc*10)
testutil.TestSingleTableQuery(t, []chunk.IndexQuery{{}}, table, 0, tc*10)
})
}
}
6 changes: 3 additions & 3 deletions pkg/storage/stores/shipper/testutil/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,15 @@ func AddRecordsToBatch(batch chunk.WriteBatch, tableName string, start, numRecor
}

type SingleTableQuerier interface {
Query(ctx context.Context, query chunk.IndexQuery, callback chunk_util.Callback) error
MultiQueries(ctx context.Context, queries []chunk.IndexQuery, callback chunk_util.Callback) error
}

func TestSingleQuery(t *testing.T, query chunk.IndexQuery, querier SingleTableQuerier, start, numRecords int) {
func TestSingleTableQuery(t *testing.T, queries []chunk.IndexQuery, querier SingleTableQuerier, start, numRecords int) {
minValue := start
maxValue := start + numRecords
fetchedRecords := make(map[string]string)

err := querier.Query(context.Background(), query, makeTestCallback(t, minValue, maxValue, fetchedRecords))
err := querier.MultiQueries(context.Background(), queries, makeTestCallback(t, minValue, maxValue, fetchedRecords))

require.NoError(t, err)
require.Len(t, fetchedRecords, numRecords)
Expand Down
10 changes: 6 additions & 4 deletions pkg/storage/stores/shipper/uploads/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,16 @@ func newTableWithDBs(dbs map[string]*bbolt.DB, path, uploader string, storageCli
}, nil
}

// Query serves the index by querying all the open dbs.
func (lt *Table) Query(ctx context.Context, query chunk.IndexQuery, callback chunk_util.Callback) error {
// MultiQueries runs multiple queries without having to take lock multiple times for each query.
func (lt *Table) MultiQueries(ctx context.Context, queries []chunk.IndexQuery, callback chunk_util.Callback) error {
lt.dbsMtx.RLock()
defer lt.dbsMtx.RUnlock()

for _, db := range lt.dbs {
if err := lt.boltdbIndexClient.QueryDB(ctx, db, query, callback); err != nil {
return err
for _, query := range queries {
if err := lt.boltdbIndexClient.QueryDB(ctx, db, query, callback); err != nil {
return err
}
}
}

Expand Down
18 changes: 14 additions & 4 deletions pkg/storage/stores/shipper/uploads/table_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"github.com/cortexproject/cortex/pkg/util/spanlogger"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"

"github.com/grafana/loki/pkg/storage/stores/shipper/util"
)

type Config struct {
Expand Down Expand Up @@ -88,22 +90,30 @@ func (tm *TableManager) Stop() {
}

func (tm *TableManager) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback chunk_util.Callback) error {
return chunk_util.DoParallelQueries(ctx, tm.query, queries, callback)
queriesByTable := util.QueriesByTable(queries)
for tableName, queries := range queriesByTable {
err := tm.query(ctx, tableName, queries, callback)
if err != nil {
return err
}
}

return nil
}

func (tm *TableManager) query(ctx context.Context, query chunk.IndexQuery, callback chunk_util.Callback) error {
func (tm *TableManager) query(ctx context.Context, tableName string, queries []chunk.IndexQuery, callback chunk_util.Callback) error {
tm.tablesMtx.RLock()
defer tm.tablesMtx.RUnlock()

log, ctx := spanlogger.New(ctx, "Shipper.Uploads.Query")
defer log.Span.Finish()

table, ok := tm.tables[query.TableName]
table, ok := tm.tables[tableName]
if !ok {
return nil
}

return table.Query(ctx, query, callback)
return util.DoParallelQueries(ctx, table, queries, callback)
}

func (tm *TableManager) BatchWrite(ctx context.Context, batch chunk.WriteBatch) error {
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/stores/shipper/uploads/table_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func TestLoadTables(t *testing.T) {
require.True(t, !stat.IsDir())

for tableName, expectedIndex := range expectedTables {
testutil.TestSingleQuery(t, chunk.IndexQuery{TableName: tableName}, tm.tables[tableName], expectedIndex.start, expectedIndex.numRecords)
testutil.TestSingleTableQuery(t, []chunk.IndexQuery{{TableName: tableName}}, tm.tables[tableName], expectedIndex.start, expectedIndex.numRecords)
}
}

Expand Down Expand Up @@ -137,7 +137,7 @@ func TestTableManager_BatchWrite(t *testing.T) {
require.Len(t, tm.tables, len(tc))

for tableName, expectedIndex := range tc {
testutil.TestSingleQuery(t, chunk.IndexQuery{TableName: tableName}, tm.tables[tableName], expectedIndex.start, expectedIndex.numRecords)
testutil.TestSingleTableQuery(t, []chunk.IndexQuery{{TableName: tableName}}, tm.tables[tableName], expectedIndex.start, expectedIndex.numRecords)
}
}

Expand Down
53 changes: 50 additions & 3 deletions pkg/storage/stores/shipper/uploads/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io/ioutil"
"os"
"path/filepath"
"strconv"
"testing"
"time"

Expand Down Expand Up @@ -88,7 +89,7 @@ func TestLoadTable(t *testing.T) {
}()

// query the loaded table to see if it has right data.
testutil.TestSingleQuery(t, chunk.IndexQuery{}, table, 0, 20)
testutil.TestSingleTableQuery(t, []chunk.IndexQuery{{}}, table, 0, 20)
}

func TestTable_Write(t *testing.T) {
Expand Down Expand Up @@ -147,7 +148,7 @@ func TestTable_Write(t *testing.T) {
require.True(t, ok)

// test that the table has current + previous records
testutil.TestSingleQuery(t, chunk.IndexQuery{}, table, 0, (i+1)*10)
testutil.TestSingleTableQuery(t, []chunk.IndexQuery{{}}, table, 0, (i+1)*10)
testutil.TestSingleDBQuery(t, chunk.IndexQuery{}, db, boltIndexClient, i*10, 10)
})
}
Expand Down Expand Up @@ -439,7 +440,7 @@ func TestTable_ImmutableUploads(t *testing.T) {
dir, err := ioutil.ReadDir(filepath.Join(objectStorageDir, table.name))
require.NoError(t, err)
for _, d := range dir {
os.RemoveAll(filepath.Join(objectStorageDir, table.name, d.Name()))
require.NoError(t, os.RemoveAll(filepath.Join(objectStorageDir, table.name, d.Name())))
}

// force upload of dbs
Expand All @@ -450,3 +451,49 @@ func TestTable_ImmutableUploads(t *testing.T) {
require.NoFileExists(t, filepath.Join(objectStorageDir, table.buildObjectKey(fmt.Sprint(expectedDB))))
}
}

func TestTable_MultiQueries(t *testing.T) {
indexPath, err := ioutil.TempDir("", "table-multi-queries")
require.NoError(t, err)

defer func() {
require.NoError(t, os.RemoveAll(indexPath))
}()

boltDBIndexClient, err := local.NewBoltDBIndexClient(local.BoltDBConfig{Directory: indexPath})
require.NoError(t, err)

defer func() {
boltDBIndexClient.Stop()
}()

// setup some dbs for a table at a path.
tablePath := testutil.SetupDBTablesAtPath(t, "test-table", indexPath, map[string]testutil.DBRecords{
"db1": {
Start: 0,
NumRecords: 10,
},
"db2": {
Start: 10,
NumRecords: 10,
},
}, false)

// try loading the table.
table, err := LoadTable(tablePath, "test", nil, boltDBIndexClient)
require.NoError(t, err)
require.NotNil(t, table)

defer func() {
table.Stop()
}()

// build queries each looking for specific value from all the dbs
var queries []chunk.IndexQuery
for i := 5; i < 15; i++ {
queries = append(queries, chunk.IndexQuery{ValueEqual: []byte(strconv.Itoa(i))})
}

// query the loaded table to see if it has right data.
testutil.TestSingleTableQuery(t, queries, table, 5, 10)
}
50 changes: 50 additions & 0 deletions pkg/storage/stores/shipper/util/queries.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package util

import (
"context"

"github.com/cortexproject/cortex/pkg/chunk"
chunk_util "github.com/cortexproject/cortex/pkg/chunk/util"
"github.com/cortexproject/cortex/pkg/util"
)

const maxQueriesPerGoroutine = 100

type TableQuerier interface {
MultiQueries(ctx context.Context, queries []chunk.IndexQuery, callback chunk_util.Callback) error
}

// QueriesByTable groups and returns queries by tables.
func QueriesByTable(queries []chunk.IndexQuery) map[string][]chunk.IndexQuery {
queriesByTable := make(map[string][]chunk.IndexQuery)
for _, query := range queries {
if _, ok := queriesByTable[query.TableName]; !ok {
queriesByTable[query.TableName] = []chunk.IndexQuery{}
}

queriesByTable[query.TableName] = append(queriesByTable[query.TableName], query)
}

return queriesByTable
}

func DoParallelQueries(ctx context.Context, tableQuerier TableQuerier, queries []chunk.IndexQuery, callback chunk_util.Callback) error {
errs := make(chan error)

for i := 0; i < len(queries); i += maxQueriesPerGoroutine {
q := queries[i:util.Min(i+maxQueriesPerGoroutine, len(queries))]
go func(queries []chunk.IndexQuery) {
errs <- tableQuerier.MultiQueries(ctx, queries, callback)
}(q)
}

var lastErr error
for i := 0; i < len(queries); i += maxQueriesPerGoroutine {
err := <-errs
if err != nil {
lastErr = err
}
}

return lastErr
}
Loading

0 comments on commit 79c0275

Please sign in to comment.