diff --git a/pkg/storage/stores/shipper/downloads/table.go b/pkg/storage/stores/shipper/downloads/table.go index 772b355fe40ca..582903bac46c4 100644 --- a/pkg/storage/stores/shipper/downloads/table.go +++ b/pkg/storage/stores/shipper/downloads/table.go @@ -198,8 +198,8 @@ func (t *Table) Close() { t.dbs = map[string]*downloadedFile{} } -// Queries all the dbs for index. -func (t *Table) Query(ctx context.Context, query chunk.IndexQuery, callback chunk_util.Callback) error { +// MultiQueries runs multiple queries without having to take lock multiple times for each query. +func (t *Table) MultiQueries(ctx context.Context, queries []chunk.IndexQuery, callback chunk_util.Callback) error { // let us check if table is ready for use while also honoring the context timeout select { case <-ctx.Done(): @@ -216,10 +216,18 @@ func (t *Table) Query(ctx context.Context, query chunk.IndexQuery, callback chun t.lastUsedAt = time.Now() - for _, db := range t.dbs { - if err := t.boltDBIndexClient.QueryDB(ctx, db.boltdb, query, callback); err != nil { - return err + log, ctx := spanlogger.New(ctx, "Shipper.Downloads.Table.MultiQueries") + defer log.Span.Finish() + + level.Debug(log).Log("table-name", t.name, "query-count", len(queries)) + + for name, db := range t.dbs { + for _, query := range queries { + if err := t.boltDBIndexClient.QueryDB(ctx, db.boltdb, query, callback); err != nil { + return err + } } + level.Debug(log).Log("queried-db", name) } return nil diff --git a/pkg/storage/stores/shipper/downloads/table_manager.go b/pkg/storage/stores/shipper/downloads/table_manager.go index 8957784cc7466..22285bc1456db 100644 --- a/pkg/storage/stores/shipper/downloads/table_manager.go +++ b/pkg/storage/stores/shipper/downloads/table_manager.go @@ -13,6 +13,8 @@ import ( "github.com/cortexproject/cortex/pkg/util/spanlogger" "github.com/go-kit/kit/log/level" "github.com/prometheus/client_golang/prometheus" + + "github.com/grafana/loki/pkg/storage/stores/shipper/util" ) const cacheCleanupInterval = time.Hour @@ -102,27 +104,35 @@ func (tm *TableManager) Stop() { } func (tm *TableManager) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback chunk_util.Callback) error { - return chunk_util.DoParallelQueries(ctx, tm.query, queries, callback) + queriesByTable := util.QueriesByTable(queries) + for tableName, queries := range queriesByTable { + err := tm.query(ctx, tableName, queries, callback) + if err != nil { + return err + } + } + + return nil } -func (tm *TableManager) query(ctx context.Context, query chunk.IndexQuery, callback chunk_util.Callback) error { +func (tm *TableManager) query(ctx context.Context, tableName string, queries []chunk.IndexQuery, callback chunk_util.Callback) error { log, ctx := spanlogger.New(ctx, "Shipper.Downloads.Query") defer log.Span.Finish() - level.Debug(log).Log("table-name", query.TableName) + level.Debug(log).Log("table-name", tableName) - table := tm.getOrCreateTable(ctx, query.TableName) + table := tm.getOrCreateTable(ctx, tableName) - err := table.Query(ctx, query, callback) + err := util.DoParallelQueries(ctx, table, queries, callback) if err != nil { if table.Err() != nil { // table is in invalid state, remove the table so that next queries re-create it. tm.tablesMtx.Lock() defer tm.tablesMtx.Unlock() - level.Error(pkg_util.Logger).Log("msg", fmt.Sprintf("table %s has some problem, cleaning it up", query.TableName), "err", table.Err()) + level.Error(pkg_util.Logger).Log("msg", fmt.Sprintf("table %s has some problem, cleaning it up", tableName), "err", table.Err()) - delete(tm.tables, query.TableName) + delete(tm.tables, tableName) return table.Err() } } diff --git a/pkg/storage/stores/shipper/downloads/table_test.go b/pkg/storage/stores/shipper/downloads/table_test.go index bab37ff1671ef..732ff56351c64 100644 --- a/pkg/storage/stores/shipper/downloads/table_test.go +++ b/pkg/storage/stores/shipper/downloads/table_test.go @@ -6,6 +6,7 @@ import ( "io/ioutil" "os" "path/filepath" + "strconv" "testing" "time" @@ -58,7 +59,7 @@ func buildTestTable(t *testing.T, tableName, path string) (*Table, *local.BoltIn } } -func TestTable_Query(t *testing.T) { +func TestTable_MultiQueries(t *testing.T) { tempDir, err := ioutil.TempDir("", "table-writes") require.NoError(t, err) @@ -90,7 +91,14 @@ func TestTable_Query(t *testing.T) { stopFunc() }() - testutil.TestSingleQuery(t, chunk.IndexQuery{}, table, 0, 30) + // build queries each looking for specific value from all the dbs + var queries []chunk.IndexQuery + for i := 5; i < 25; i++ { + queries = append(queries, chunk.IndexQuery{ValueEqual: []byte(strconv.Itoa(i))}) + } + + // query the loaded table to see if it has right data. + testutil.TestSingleTableQuery(t, queries, table, 5, 20) } func TestTable_Sync(t *testing.T) { @@ -136,7 +144,7 @@ func TestTable_Sync(t *testing.T) { }() // query table to see it has expected records setup - testutil.TestSingleQuery(t, chunk.IndexQuery{}, table, 0, 30) + testutil.TestSingleTableQuery(t, []chunk.IndexQuery{{}}, table, 0, 30) // add a sleep since we are updating a file and CI is sometimes too fast to create a difference in mtime of files time.Sleep(time.Second) @@ -150,7 +158,7 @@ func TestTable_Sync(t *testing.T) { require.NoError(t, table.Sync(context.Background())) // query and verify table has expected records from new and updated db and the records from deleted db are gone - testutil.TestSingleQuery(t, chunk.IndexQuery{}, table, 10, 40) + testutil.TestSingleTableQuery(t, []chunk.IndexQuery{{}}, table, 10, 40) // verify files in cache where dbs for the table are synced to double check. expectedFilesInDir := map[string]struct{}{ @@ -187,7 +195,7 @@ func TestTable_LastUsedAt(t *testing.T) { require.InDelta(t, time.Now().Add(-time.Hour).Unix(), table.LastUsedAt().Unix(), 1) // query the table which should set the last used at to now. - err = table.Query(context.Background(), chunk.IndexQuery{}, func(query chunk.IndexQuery, batch chunk.ReadBatch) bool { + err = table.MultiQueries(context.Background(), []chunk.IndexQuery{{}}, func(query chunk.IndexQuery, batch chunk.ReadBatch) bool { return true }) require.NoError(t, err) @@ -226,7 +234,7 @@ func TestTable_doParallelDownload(t *testing.T) { // ensure that we have `tc` number of files downloaded and opened. require.Len(t, table.dbs, tc) - testutil.TestSingleQuery(t, chunk.IndexQuery{}, table, 0, tc*10) + testutil.TestSingleTableQuery(t, []chunk.IndexQuery{{}}, table, 0, tc*10) }) } } diff --git a/pkg/storage/stores/shipper/testutil/testutil.go b/pkg/storage/stores/shipper/testutil/testutil.go index 9d484d73775bf..ee96d9b6287c0 100644 --- a/pkg/storage/stores/shipper/testutil/testutil.go +++ b/pkg/storage/stores/shipper/testutil/testutil.go @@ -43,15 +43,15 @@ func AddRecordsToBatch(batch chunk.WriteBatch, tableName string, start, numRecor } type SingleTableQuerier interface { - Query(ctx context.Context, query chunk.IndexQuery, callback chunk_util.Callback) error + MultiQueries(ctx context.Context, queries []chunk.IndexQuery, callback chunk_util.Callback) error } -func TestSingleQuery(t *testing.T, query chunk.IndexQuery, querier SingleTableQuerier, start, numRecords int) { +func TestSingleTableQuery(t *testing.T, queries []chunk.IndexQuery, querier SingleTableQuerier, start, numRecords int) { minValue := start maxValue := start + numRecords fetchedRecords := make(map[string]string) - err := querier.Query(context.Background(), query, makeTestCallback(t, minValue, maxValue, fetchedRecords)) + err := querier.MultiQueries(context.Background(), queries, makeTestCallback(t, minValue, maxValue, fetchedRecords)) require.NoError(t, err) require.Len(t, fetchedRecords, numRecords) diff --git a/pkg/storage/stores/shipper/uploads/table.go b/pkg/storage/stores/shipper/uploads/table.go index dc3b5e91f16ce..e138f8bcb7788 100644 --- a/pkg/storage/stores/shipper/uploads/table.go +++ b/pkg/storage/stores/shipper/uploads/table.go @@ -98,14 +98,16 @@ func newTableWithDBs(dbs map[string]*bbolt.DB, path, uploader string, storageCli }, nil } -// Query serves the index by querying all the open dbs. -func (lt *Table) Query(ctx context.Context, query chunk.IndexQuery, callback chunk_util.Callback) error { +// MultiQueries runs multiple queries without having to take lock multiple times for each query. +func (lt *Table) MultiQueries(ctx context.Context, queries []chunk.IndexQuery, callback chunk_util.Callback) error { lt.dbsMtx.RLock() defer lt.dbsMtx.RUnlock() for _, db := range lt.dbs { - if err := lt.boltdbIndexClient.QueryDB(ctx, db, query, callback); err != nil { - return err + for _, query := range queries { + if err := lt.boltdbIndexClient.QueryDB(ctx, db, query, callback); err != nil { + return err + } } } diff --git a/pkg/storage/stores/shipper/uploads/table_manager.go b/pkg/storage/stores/shipper/uploads/table_manager.go index cdc2425a7e8f5..d9df2de73d677 100644 --- a/pkg/storage/stores/shipper/uploads/table_manager.go +++ b/pkg/storage/stores/shipper/uploads/table_manager.go @@ -18,6 +18,8 @@ import ( "github.com/cortexproject/cortex/pkg/util/spanlogger" "github.com/go-kit/kit/log/level" "github.com/prometheus/client_golang/prometheus" + + "github.com/grafana/loki/pkg/storage/stores/shipper/util" ) type Config struct { @@ -88,22 +90,30 @@ func (tm *TableManager) Stop() { } func (tm *TableManager) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback chunk_util.Callback) error { - return chunk_util.DoParallelQueries(ctx, tm.query, queries, callback) + queriesByTable := util.QueriesByTable(queries) + for tableName, queries := range queriesByTable { + err := tm.query(ctx, tableName, queries, callback) + if err != nil { + return err + } + } + + return nil } -func (tm *TableManager) query(ctx context.Context, query chunk.IndexQuery, callback chunk_util.Callback) error { +func (tm *TableManager) query(ctx context.Context, tableName string, queries []chunk.IndexQuery, callback chunk_util.Callback) error { tm.tablesMtx.RLock() defer tm.tablesMtx.RUnlock() log, ctx := spanlogger.New(ctx, "Shipper.Uploads.Query") defer log.Span.Finish() - table, ok := tm.tables[query.TableName] + table, ok := tm.tables[tableName] if !ok { return nil } - return table.Query(ctx, query, callback) + return util.DoParallelQueries(ctx, table, queries, callback) } func (tm *TableManager) BatchWrite(ctx context.Context, batch chunk.WriteBatch) error { diff --git a/pkg/storage/stores/shipper/uploads/table_manager_test.go b/pkg/storage/stores/shipper/uploads/table_manager_test.go index 860c37bac731c..a69f9b4c6cc91 100644 --- a/pkg/storage/stores/shipper/uploads/table_manager_test.go +++ b/pkg/storage/stores/shipper/uploads/table_manager_test.go @@ -101,7 +101,7 @@ func TestLoadTables(t *testing.T) { require.True(t, !stat.IsDir()) for tableName, expectedIndex := range expectedTables { - testutil.TestSingleQuery(t, chunk.IndexQuery{TableName: tableName}, tm.tables[tableName], expectedIndex.start, expectedIndex.numRecords) + testutil.TestSingleTableQuery(t, []chunk.IndexQuery{{TableName: tableName}}, tm.tables[tableName], expectedIndex.start, expectedIndex.numRecords) } } @@ -137,7 +137,7 @@ func TestTableManager_BatchWrite(t *testing.T) { require.Len(t, tm.tables, len(tc)) for tableName, expectedIndex := range tc { - testutil.TestSingleQuery(t, chunk.IndexQuery{TableName: tableName}, tm.tables[tableName], expectedIndex.start, expectedIndex.numRecords) + testutil.TestSingleTableQuery(t, []chunk.IndexQuery{{TableName: tableName}}, tm.tables[tableName], expectedIndex.start, expectedIndex.numRecords) } } diff --git a/pkg/storage/stores/shipper/uploads/table_test.go b/pkg/storage/stores/shipper/uploads/table_test.go index fa62e4143d0c8..8b0dbf9c3745d 100644 --- a/pkg/storage/stores/shipper/uploads/table_test.go +++ b/pkg/storage/stores/shipper/uploads/table_test.go @@ -7,6 +7,7 @@ import ( "io/ioutil" "os" "path/filepath" + "strconv" "testing" "time" @@ -88,7 +89,7 @@ func TestLoadTable(t *testing.T) { }() // query the loaded table to see if it has right data. - testutil.TestSingleQuery(t, chunk.IndexQuery{}, table, 0, 20) + testutil.TestSingleTableQuery(t, []chunk.IndexQuery{{}}, table, 0, 20) } func TestTable_Write(t *testing.T) { @@ -147,7 +148,7 @@ func TestTable_Write(t *testing.T) { require.True(t, ok) // test that the table has current + previous records - testutil.TestSingleQuery(t, chunk.IndexQuery{}, table, 0, (i+1)*10) + testutil.TestSingleTableQuery(t, []chunk.IndexQuery{{}}, table, 0, (i+1)*10) testutil.TestSingleDBQuery(t, chunk.IndexQuery{}, db, boltIndexClient, i*10, 10) }) } @@ -439,7 +440,7 @@ func TestTable_ImmutableUploads(t *testing.T) { dir, err := ioutil.ReadDir(filepath.Join(objectStorageDir, table.name)) require.NoError(t, err) for _, d := range dir { - os.RemoveAll(filepath.Join(objectStorageDir, table.name, d.Name())) + require.NoError(t, os.RemoveAll(filepath.Join(objectStorageDir, table.name, d.Name()))) } // force upload of dbs @@ -450,3 +451,49 @@ func TestTable_ImmutableUploads(t *testing.T) { require.NoFileExists(t, filepath.Join(objectStorageDir, table.buildObjectKey(fmt.Sprint(expectedDB)))) } } + +func TestTable_MultiQueries(t *testing.T) { + indexPath, err := ioutil.TempDir("", "table-multi-queries") + require.NoError(t, err) + + defer func() { + require.NoError(t, os.RemoveAll(indexPath)) + }() + + boltDBIndexClient, err := local.NewBoltDBIndexClient(local.BoltDBConfig{Directory: indexPath}) + require.NoError(t, err) + + defer func() { + boltDBIndexClient.Stop() + }() + + // setup some dbs for a table at a path. + tablePath := testutil.SetupDBTablesAtPath(t, "test-table", indexPath, map[string]testutil.DBRecords{ + "db1": { + Start: 0, + NumRecords: 10, + }, + "db2": { + Start: 10, + NumRecords: 10, + }, + }, false) + + // try loading the table. + table, err := LoadTable(tablePath, "test", nil, boltDBIndexClient) + require.NoError(t, err) + require.NotNil(t, table) + + defer func() { + table.Stop() + }() + + // build queries each looking for specific value from all the dbs + var queries []chunk.IndexQuery + for i := 5; i < 15; i++ { + queries = append(queries, chunk.IndexQuery{ValueEqual: []byte(strconv.Itoa(i))}) + } + + // query the loaded table to see if it has right data. + testutil.TestSingleTableQuery(t, queries, table, 5, 10) +} diff --git a/pkg/storage/stores/shipper/util/queries.go b/pkg/storage/stores/shipper/util/queries.go new file mode 100644 index 0000000000000..1645e22dc5d4a --- /dev/null +++ b/pkg/storage/stores/shipper/util/queries.go @@ -0,0 +1,50 @@ +package util + +import ( + "context" + + "github.com/cortexproject/cortex/pkg/chunk" + chunk_util "github.com/cortexproject/cortex/pkg/chunk/util" + "github.com/cortexproject/cortex/pkg/util" +) + +const maxQueriesPerGoroutine = 100 + +type TableQuerier interface { + MultiQueries(ctx context.Context, queries []chunk.IndexQuery, callback chunk_util.Callback) error +} + +// QueriesByTable groups and returns queries by tables. +func QueriesByTable(queries []chunk.IndexQuery) map[string][]chunk.IndexQuery { + queriesByTable := make(map[string][]chunk.IndexQuery) + for _, query := range queries { + if _, ok := queriesByTable[query.TableName]; !ok { + queriesByTable[query.TableName] = []chunk.IndexQuery{} + } + + queriesByTable[query.TableName] = append(queriesByTable[query.TableName], query) + } + + return queriesByTable +} + +func DoParallelQueries(ctx context.Context, tableQuerier TableQuerier, queries []chunk.IndexQuery, callback chunk_util.Callback) error { + errs := make(chan error) + + for i := 0; i < len(queries); i += maxQueriesPerGoroutine { + q := queries[i:util.Min(i+maxQueriesPerGoroutine, len(queries))] + go func(queries []chunk.IndexQuery) { + errs <- tableQuerier.MultiQueries(ctx, queries, callback) + }(q) + } + + var lastErr error + for i := 0; i < len(queries); i += maxQueriesPerGoroutine { + err := <-errs + if err != nil { + lastErr = err + } + } + + return lastErr +} diff --git a/pkg/storage/stores/shipper/util/queries_test.go b/pkg/storage/stores/shipper/util/queries_test.go new file mode 100644 index 0000000000000..7ee6c24c5b072 --- /dev/null +++ b/pkg/storage/stores/shipper/util/queries_test.go @@ -0,0 +1,89 @@ +package util + +import ( + "context" + "github.com/cortexproject/cortex/pkg/chunk" + chunk_util "github.com/cortexproject/cortex/pkg/chunk/util" + "github.com/stretchr/testify/require" + "strconv" + "sync" + "testing" +) + +type mockTableQuerier struct { + sync.Mutex + queries map[string]chunk.IndexQuery +} + +func (m *mockTableQuerier) MultiQueries(ctx context.Context, queries []chunk.IndexQuery, callback chunk_util.Callback) error { + m.Lock() + defer m.Unlock() + + for _, query := range queries{ + m.queries[query.HashValue] = query + } + + return nil +} + +func (m *mockTableQuerier) hasQueries(t *testing.T, count int) { + require.Len(t, m.queries, count) + for i := 0; i < count; i++ { + idx := strconv.Itoa(i) + + require.Equal(t, m.queries[idx], chunk.IndexQuery{ + HashValue: idx, + ValueEqual: []byte(idx), + }) + } +} + +func TestDoParallelQueries(t *testing.T) { + for _, tc := range []struct{ + name string + queryCount int + }{ + { + name: "queries < maxQueriesPerGoroutine", + queryCount: maxQueriesPerGoroutine/2, + }, + { + name: "queries = maxQueriesPerGoroutine", + queryCount: maxQueriesPerGoroutine, + }, + { + name: "queries > maxQueriesPerGoroutine", + queryCount: maxQueriesPerGoroutine*2, + }, + }{ + t.Run(tc.name, func(t *testing.T) { + queries := buildQueries(tc.queryCount) + + tableQuerier := mockTableQuerier{ + queries: map[string]chunk.IndexQuery{}, + } + + err := DoParallelQueries(context.Background(), &tableQuerier, queries, func(query chunk.IndexQuery, batch chunk.ReadBatch) bool { + return false + }) + require.NoError(t, err) + + tableQuerier.hasQueries(t, tc.queryCount) + }) + } + + +} + +func buildQueries(n int) []chunk.IndexQuery { + queries := make([]chunk.IndexQuery, 0, n) + for i := 0; i < n; i ++ { + idx := strconv.Itoa(i) + queries = append(queries, chunk.IndexQuery{ + HashValue: idx, + ValueEqual: []byte(idx), + }) + } + + return queries +}