From dd6547903f7365927b0bd72f1d12eaaee9e1e624 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Wed, 8 Jul 2020 16:44:44 -0400 Subject: [PATCH] Improve object key parsing for boltdb shipper. Signed-off-by: Cyril Tovena --- pkg/storage/stores/local/downloads.go | 29 +++++++++++++++++++++------ 1 file changed, 23 insertions(+), 6 deletions(-) diff --git a/pkg/storage/stores/local/downloads.go b/pkg/storage/stores/local/downloads.go index c4b8542098ecc..69eb55b23f7a9 100644 --- a/pkg/storage/stores/local/downloads.go +++ b/pkg/storage/stores/local/downloads.go @@ -29,7 +29,10 @@ func (fc *FilesCollection) checkStorageForUpdates(ctx context.Context) (toDownlo fc.mtx.RLock() for _, object := range objects { - uploader := strings.Split(object.Key, "/")[1] + uploader, err := getUploaderFromObjectKey(object.Key) + if err != nil { + return nil, nil, err + } listedUploaders[uploader] = struct{}{} // Checking whether file was updated in the store after we downloaded it, if not, no need to include it in updates @@ -81,14 +84,17 @@ func (fc *FilesCollection) Sync(ctx context.Context) error { // It first downloads file to a temp location so that we close the existing file(if already exists), replace it with new one and then reopen it. func (fc *FilesCollection) downloadFile(ctx context.Context, storageObject chunk.StorageObject) error { - uploader := strings.Split(storageObject.Key, "/")[1] + uploader, err := getUploaderFromObjectKey(storageObject.Key) + if err != nil { + return err + } folderPath, _ := fc.getFolderPathForPeriod(false) filePath := path.Join(folderPath, uploader) // download the file temporarily with some other name to allow boltdb client to close the existing file first if it exists tempFilePath := path.Join(folderPath, fmt.Sprintf("%s.%s", uploader, "temp")) - err := fc.getFileFromStorage(ctx, storageObject.Key, tempFilePath) + err = fc.getFileFromStorage(ctx, storageObject.Key, tempFilePath) if err != nil { return err } @@ -185,7 +191,11 @@ func (fc *FilesCollection) downloadAllFilesForPeriod(ctx context.Context) (err e } for _, object := range objects { - uploader := getUploaderFromObjectKey(object.Key) + var uploader string + uploader, err = getUploaderFromObjectKey(object.Key) + if err != nil { + return + } filePath := path.Join(folderPath, uploader) df := downloadedFile{} @@ -232,6 +242,13 @@ func (fc *FilesCollection) getFolderPathForPeriod(ensureExists bool) (string, er return folderPath, nil } -func getUploaderFromObjectKey(objectKey string) string { - return strings.Split(objectKey, "/")[1] +func getUploaderFromObjectKey(objectKey string) (string, error) { + uploaders := strings.Split(objectKey, "/") + if len(uploaders) != 2 { + return "", fmt.Errorf("invalid object key: %v", objectKey) + } + if uploaders[1] == "" { + return "", fmt.Errorf("empty uploader, object key: %v", objectKey) + } + return uploaders[1], nil }