Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve object key parsing for boltdb shipper. #2323

Merged
merged 1 commit into from
Jul 13, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 23 additions & 6 deletions pkg/storage/stores/local/downloads.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ func (fc *FilesCollection) checkStorageForUpdates(ctx context.Context) (toDownlo

fc.mtx.RLock()
for _, object := range objects {
uploader := strings.Split(object.Key, "/")[1]
uploader, err := getUploaderFromObjectKey(object.Key)
if err != nil {
return nil, nil, err
}
listedUploaders[uploader] = struct{}{}

// Checking whether file was updated in the store after we downloaded it, if not, no need to include it in updates
Expand Down Expand Up @@ -81,14 +84,17 @@ func (fc *FilesCollection) Sync(ctx context.Context) error {

// It first downloads file to a temp location so that we close the existing file(if already exists), replace it with new one and then reopen it.
func (fc *FilesCollection) downloadFile(ctx context.Context, storageObject chunk.StorageObject) error {
uploader := strings.Split(storageObject.Key, "/")[1]
uploader, err := getUploaderFromObjectKey(storageObject.Key)
if err != nil {
return err
}
folderPath, _ := fc.getFolderPathForPeriod(false)
filePath := path.Join(folderPath, uploader)

// download the file temporarily with some other name to allow boltdb client to close the existing file first if it exists
tempFilePath := path.Join(folderPath, fmt.Sprintf("%s.%s", uploader, "temp"))

err := fc.getFileFromStorage(ctx, storageObject.Key, tempFilePath)
err = fc.getFileFromStorage(ctx, storageObject.Key, tempFilePath)
if err != nil {
return err
}
Expand Down Expand Up @@ -185,7 +191,11 @@ func (fc *FilesCollection) downloadAllFilesForPeriod(ctx context.Context) (err e
}

for _, object := range objects {
uploader := getUploaderFromObjectKey(object.Key)
var uploader string
uploader, err = getUploaderFromObjectKey(object.Key)
if err != nil {
return
}

filePath := path.Join(folderPath, uploader)
df := downloadedFile{}
Expand Down Expand Up @@ -232,6 +242,13 @@ func (fc *FilesCollection) getFolderPathForPeriod(ensureExists bool) (string, er
return folderPath, nil
}

func getUploaderFromObjectKey(objectKey string) string {
return strings.Split(objectKey, "/")[1]
func getUploaderFromObjectKey(objectKey string) (string, error) {
uploaders := strings.Split(objectKey, "/")
if len(uploaders) != 2 {
return "", fmt.Errorf("invalid object key: %v", objectKey)
}
if uploaders[1] == "" {
return "", fmt.Errorf("empty uploader, object key: %v", objectKey)
}
return uploaders[1], nil
}