Skip to content

Commit

Permalink
Boltdb shipper query readiness (#2911)
Browse files Browse the repository at this point in the history
* add support for downloading index on startup in boltdb-shipper for query readiness

* tests

* pass context to load tables function to be able to cancel the operation

* add missing call to tablemanager.stop when loading of tables fail
  • Loading branch information
sandeepsukhani authored Nov 18, 2020
1 parent 1c8dd3e commit cd723c0
Show file tree
Hide file tree
Showing 5 changed files with 453 additions and 73 deletions.
144 changes: 95 additions & 49 deletions pkg/storage/stores/shipper/downloads/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import (
"context"
"fmt"
"io"
"io/ioutil"
"os"
"path"
"path/filepath"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -40,11 +42,6 @@ type StorageClient interface {
List(ctx context.Context, prefix, delimiter string) ([]chunk.StorageObject, []chunk.StorageCommonPrefix, error)
}

type downloadedFile struct {
mtime time.Time
boltdb *bbolt.DB
}

// Table is a collection of multiple files created for a same table by various ingesters.
// All the public methods are concurrency safe and take care of mutexes to avoid any data race.
type Table struct {
Expand All @@ -55,7 +52,7 @@ type Table struct {
boltDBIndexClient BoltDBIndexClient

lastUsedAt time.Time
dbs map[string]*downloadedFile
dbs map[string]*bbolt.DB
dbsMtx sync.RWMutex
err error

Expand All @@ -73,7 +70,7 @@ func NewTable(spanCtx context.Context, name, cacheLocation string, storageClient
storageClient: storageClient,
boltDBIndexClient: boltDBIndexClient,
lastUsedAt: time.Now(),
dbs: map[string]*downloadedFile{},
dbs: map[string]*bbolt.DB{},
ready: make(chan struct{}),
cancelFunc: cancel,
}
Expand All @@ -100,6 +97,73 @@ func NewTable(spanCtx context.Context, name, cacheLocation string, storageClient
return &table
}

// LoadTable loads a table from local storage(syncs the table too if we have it locally) or downloads it from the shared store.
func LoadTable(ctx context.Context, name, cacheLocation string, storageClient StorageClient, boltDBIndexClient BoltDBIndexClient, metrics *metrics) (*Table, error) {
// see if folder for table already exists.
folderPath := path.Join(cacheLocation, name)
_, err := os.Stat(folderPath)
if err != nil {
if !os.IsNotExist(err) {
return nil, err
}

// folder for table doesn't exist, this means we have to download it from the shared store.
table := NewTable(ctx, name, cacheLocation, storageClient, boltDBIndexClient, metrics)
<-table.ready
if table.err != nil {
return nil, table.err
}
return table, nil
}

// folder for table already exists, open all the boltdb files from it.
filesInfo, err := ioutil.ReadDir(folderPath)
if err != nil {
return nil, err
}

table := Table{
name: name,
cacheLocation: cacheLocation,
metrics: metrics,
storageClient: storageClient,
boltDBIndexClient: boltDBIndexClient,
lastUsedAt: time.Now(),
dbs: map[string]*bbolt.DB{},
ready: make(chan struct{}),
cancelFunc: func() {},
}

level.Debug(util.Logger).Log("msg", fmt.Sprintf("opening locally present files for table %s", name), "files", fmt.Sprint(filesInfo))

for _, fileInfo := range filesInfo {
if fileInfo.IsDir() {
continue
}

// if we fail to open a boltdb file, lets skip it and let sync operation re-download the file from storage.
boltdb, err := local.OpenBoltdbFile(filepath.Join(folderPath, fileInfo.Name()))
if err != nil {
level.Error(util.Logger).Log("msg", fmt.Sprintf("failed to open existing boltdb file %s, continuing without it to let the sync operation catch up", filepath.Join(folderPath, fileInfo.Name())), "err", err)
continue
}

table.dbs[fileInfo.Name()] = boltdb
}

level.Debug(util.Logger).Log("msg", fmt.Sprintf("syncing files for table %s", name))
// sync the table to get new files and remove the deleted ones from storage.
err = table.Sync(ctx)
if err != nil {
return nil, err
}

// close the ready channel because the query function waits for it to be closed before performing queries.
close(table.ready)

return &table, nil
}

// init downloads all the db files for the table from object storage.
// it assumes the locking of mutex is taken care of by the caller.
func (t *Table) init(ctx context.Context, spanLogger log.Logger) (err error) {
Expand Down Expand Up @@ -154,10 +218,7 @@ func (t *Table) init(ctx context.Context, spanLogger log.Logger) (err error) {
}

filePath := path.Join(folderPath, dbName)
df := downloadedFile{}

df.mtime = object.ModifiedAt
df.boltdb, err = local.OpenBoltdbFile(filePath)
boltdb, err := local.OpenBoltdbFile(filePath)
if err != nil {
return err
}
Expand All @@ -170,7 +231,7 @@ func (t *Table) init(ctx context.Context, spanLogger log.Logger) (err error) {

totalFilesSize += stat.Size()

t.dbs[dbName] = &df
t.dbs[dbName] = boltdb
}

duration := time.Since(startTime).Seconds()
Expand All @@ -190,18 +251,12 @@ func (t *Table) Close() {
defer t.dbsMtx.Unlock()

for name, db := range t.dbs {
dbPath := db.boltdb.Path()

if err := db.boltdb.Close(); err != nil {
if err := db.Close(); err != nil {
level.Error(util.Logger).Log("msg", fmt.Sprintf("failed to close file %s for table %s", name, t.name), "err", err)
}

if err := os.Remove(dbPath); err != nil {
level.Error(util.Logger).Log("msg", fmt.Sprintf("failed to remove file %s for table %s", name, t.name), "err", err)
}
}

t.dbs = map[string]*downloadedFile{}
t.dbs = map[string]*bbolt.DB{}
}

// MultiQueries runs multiple queries without having to take lock multiple times for each query.
Expand Down Expand Up @@ -230,7 +285,7 @@ func (t *Table) MultiQueries(ctx context.Context, queries []chunk.IndexQuery, ca
id := shipper_util.NewIndexDeduper(callback)

for name, db := range t.dbs {
err := db.boltdb.View(func(tx *bbolt.Tx) error {
err := db.View(func(tx *bbolt.Tx) error {
bucket := tx.Bucket(bucketName)
if bucket == nil {
return nil
Expand Down Expand Up @@ -267,7 +322,12 @@ func (t *Table) CleanupAllDBs() error {
return err
}
}
return nil

tablePath, err := t.folderPathForTable(false)
if err != nil {
return err
}
return os.RemoveAll(tablePath)
}

// Err returns the err which is usually set when there was any issue in init.
Expand All @@ -280,15 +340,19 @@ func (t *Table) LastUsedAt() time.Time {
return t.lastUsedAt
}

func (t *Table) UpdateLastUsedAt() {
t.lastUsedAt = time.Now()
}

func (t *Table) cleanupDB(fileName string) error {
df, ok := t.dbs[fileName]
if !ok {
return fmt.Errorf("file %s not found in files collection for cleaning up", fileName)
}

filePath := df.boltdb.Path()
filePath := df.Path()

if err := df.boltdb.Close(); err != nil {
if err := df.Close(); err != nil {
return err
}

Expand Down Expand Up @@ -352,9 +416,10 @@ func (t *Table) checkStorageForUpdates(ctx context.Context) (toDownload []chunk.
}
listedDBs[dbName] = struct{}{}

// Checking whether file was updated in the store after we downloaded it, if not, no need to include it in updates
downloadedFileDetails, ok := t.dbs[dbName]
if !ok || downloadedFileDetails.mtime != object.ModifiedAt {
// Checking whether file was already downloaded, if not, download it.
// We do not ever upload files in the object store with the same name but different contents so we do not consider downloading modified files again.
_, ok := t.dbs[dbName]
if !ok {
toDownload = append(toDownload, object)
}
}
Expand All @@ -379,39 +444,20 @@ func (t *Table) downloadFile(ctx context.Context, storageObject chunk.StorageObj
folderPath, _ := t.folderPathForTable(false)
filePath := path.Join(folderPath, dbName)

// download the file temporarily with some other name to allow boltdb client to close the existing file first if it exists
tempFilePath := path.Join(folderPath, fmt.Sprintf("%s.%s", dbName, "temp"))

err = shipper_util.GetFileFromStorage(ctx, t.storageClient, storageObject.Key, tempFilePath)
err = shipper_util.GetFileFromStorage(ctx, t.storageClient, storageObject.Key, filePath)
if err != nil {
return err
}

t.dbsMtx.Lock()
defer t.dbsMtx.Unlock()

df, ok := t.dbs[dbName]
if ok {
if err := df.boltdb.Close(); err != nil {
return err
}
} else {
df = &downloadedFile{}
}

// move the file from temp location to actual location
err = os.Rename(tempFilePath, filePath)
if err != nil {
return err
}

df.mtime = storageObject.ModifiedAt
df.boltdb, err = local.OpenBoltdbFile(filePath)
boltdb, err := local.OpenBoltdbFile(filePath)
if err != nil {
return err
}

t.dbs[dbName] = df
t.dbs[dbName] = boltdb

return nil
}
Expand Down
Loading

0 comments on commit cd723c0

Please sign in to comment.