Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor chunk upload and make the parallel uploads configurable #68

Merged
merged 3 commits into from
Nov 28, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions cmd/initializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,11 @@ func NewInitializeCommand(stopCh <-chan struct{}) *cobra.Command {
var snapstoreConfig *snapstore.Config
if storageProvider != "" {
snapstoreConfig = &snapstore.Config{
Provider: storageProvider,
Container: storageContainer,
Prefix: path.Join(storagePrefix, backupFormatVersion),
Provider: storageProvider,
Container: storageContainer,
Prefix: path.Join(storagePrefix, backupFormatVersion),
MaxParallelChunkUploads: maxParallelChunkUploads,
TempDir: snapstoreTempDir,
}
}
etcdInitializer := initializer.NewInitializer(options, snapstoreConfig, logger)
Expand Down
2 changes: 2 additions & 0 deletions cmd/miscellaneous.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,6 @@ func initializeSnapstoreFlags(cmd *cobra.Command) {
cmd.Flags().StringVar(&storageProvider, "storage-provider", "", "snapshot storage provider")
cmd.Flags().StringVar(&storageContainer, "store-container", "", "container which will be used as snapstore")
cmd.Flags().StringVar(&storagePrefix, "store-prefix", "", "prefix or directory inside container under which snapstore is created")
cmd.Flags().IntVar(&maxParallelChunkUploads, "max-parallel-chunk-uploads", 5, "maximum number of parallel chunk uploads allowed ")
cmd.Flags().StringVar(&snapstoreTempDir, "snapstore-temp-directory", "/tmp", "temporary directory for processing")
}
8 changes: 5 additions & 3 deletions cmd/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,11 @@ func NewRestoreCommand(stopCh <-chan struct{}) *cobra.Command {
logger.Fatalf("failed parsing peers urls for restore cluster: %v", err)
}
snapstoreConfig := &snapstore.Config{
Provider: storageProvider,
Container: storageContainer,
Prefix: path.Join(storagePrefix, backupFormatVersion),
Provider: storageProvider,
Container: storageContainer,
Prefix: path.Join(storagePrefix, backupFormatVersion),
MaxParallelChunkUploads: maxParallelChunkUploads,
TempDir: snapstoreTempDir,
}
store, err := snapstore.GetSnapstore(snapstoreConfig)
if err != nil {
Expand Down
15 changes: 10 additions & 5 deletions cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,11 @@ func NewServerCommand(stopCh <-chan struct{}) *cobra.Command {

if storageProvider != "" {
snapstoreConfig = &snapstore.Config{
Provider: storageProvider,
Container: storageContainer,
Prefix: path.Join(storagePrefix, backupFormatVersion),
Provider: storageProvider,
Container: storageContainer,
Prefix: path.Join(storagePrefix, backupFormatVersion),
MaxParallelChunkUploads: maxParallelChunkUploads,
TempDir: snapstoreTempDir,
}
}

Expand Down Expand Up @@ -135,8 +137,11 @@ func NewServerCommand(stopCh <-chan struct{}) *cobra.Command {
ssrStopCh = make(chan struct{})
go handleSsrRequest(handler, ssr, ackCh, ssrStopCh, stopCh)
go handleAckState(handler, ackCh)
go etcdutil.DefragDataPeriodically(stopCh, tlsConfig, time.Duration(defragmentationPeriodHours)*time.Hour, time.Duration(etcdConnectionTimeout)*time.Second, ssr.TriggerFullSnapshot)

if defragmentationPeriodHours < 1 {
logger.Infof("Disabling defragmentation since defragmentation period [%d] is less than 1", defragmentationPeriodHours)
} else {
go etcdutil.DefragDataPeriodically(stopCh, tlsConfig, time.Duration(defragmentationPeriodHours)*time.Hour, time.Duration(etcdConnectionTimeout)*time.Second, ssr.TriggerFullSnapshot)
}
for {
logger.Infof("Probing etcd...")
select {
Expand Down
16 changes: 12 additions & 4 deletions cmd/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@ func NewSnapshotCommand(stopCh <-chan struct{}) *cobra.Command {
storing snapshots on various cloud storage providers as well as local disk location.`,
Run: func(cmd *cobra.Command, args []string) {
snapstoreConfig := &snapstore.Config{
Provider: storageProvider,
Container: storageContainer,
Prefix: path.Join(storagePrefix, backupFormatVersion),
Provider: storageProvider,
Container: storageContainer,
Prefix: path.Join(storagePrefix, backupFormatVersion),
MaxParallelChunkUploads: maxParallelChunkUploads,
TempDir: snapstoreTempDir,
}
ss, err := snapstore.GetSnapstore(snapstoreConfig)
if err != nil {
Expand Down Expand Up @@ -64,7 +66,13 @@ storing snapshots on various cloud storage providers as well as local disk locat
ssr := snapshotter.NewSnapshotter(
logger,
snapshotterConfig)
go etcdutil.DefragDataPeriodically(stopCh, tlsConfig, time.Duration(defragmentationPeriodHours)*time.Hour, time.Duration(etcdConnectionTimeout)*time.Second, ssr.TriggerFullSnapshot)

if defragmentationPeriodHours < 1 {
logger.Infof("Disabling defragmentation since defragmentation period [%d] is less than 1", defragmentationPeriodHours)
} else {
go etcdutil.DefragDataPeriodically(stopCh, tlsConfig, time.Duration(defragmentationPeriodHours)*time.Hour, time.Duration(etcdConnectionTimeout)*time.Second, ssr.TriggerFullSnapshot)
}

gcStopCh := make(chan struct{})
go ssr.RunGarbageCollector(gcStopCh)
if err := ssr.Run(stopCh, true); err != nil {
Expand Down
8 changes: 5 additions & 3 deletions cmd/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,11 @@ var (
restoreMaxFetchers int

//snapstore flags
storageProvider string
storageContainer string
storagePrefix string
storageProvider string
storageContainer string
storagePrefix string
maxParallelChunkUploads int
snapstoreTempDir string
)

var emptyStruct struct{}
4 changes: 0 additions & 4 deletions pkg/etcdutil/defrag.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,6 @@ func defragData(tlsConfig *TLSConfig, etcdConnectionTimeout time.Duration) error

// DefragDataPeriodically defragments the data directory of each etcd member.
func DefragDataPeriodically(stopCh <-chan struct{}, tlsConfig *TLSConfig, defragmentationPeriod, etcdConnectionTimeout time.Duration, callback func()) {
if defragmentationPeriod <= time.Hour {
logrus.Infof("Disabling defragmentation since defragmentation period [%d] is less than 1", defragmentationPeriod)
return
}
logrus.Infof("Defragmentation period :%d hours", defragmentationPeriod/time.Hour)
for {
select {
Expand Down
28 changes: 28 additions & 0 deletions pkg/etcdutil/defrag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,5 +74,33 @@ var _ = Describe("Defrag", func() {

Expect(newStatus.Header.GetRevision()).Should(BeNumerically("==", oldStatus.Header.GetRevision()))
})

It("should defrag periodically with callback", func() {
defragCount := 0
expectedDefragCount := 2
defragmentationPeriod := time.Duration(30) * time.Second
client, err := GetTLSClientForEtcd(tlsConfig)
Expect(err).ShouldNot(HaveOccurred())
defer client.Close()
ctx, cancel := context.WithTimeout(context.TODO(), etcdDialTimeout)
oldStatus, err := client.Status(ctx, endpoints[0])
cancel()
Expect(err).ShouldNot(HaveOccurred())
stopCh := make(chan struct{})
time.AfterFunc(time.Second*time.Duration(75), func() {
close(stopCh)
})

DefragDataPeriodically(stopCh, tlsConfig, defragmentationPeriod, etcdConnectionTimeout, func() { defragCount++ })

ctx, cancel = context.WithTimeout(context.TODO(), etcdDialTimeout)
newStatus, err := client.Status(ctx, endpoints[0])
cancel()
Expect(err).ShouldNot(HaveOccurred())

Expect(defragCount).Should(BeNumerically("==", expectedDefragCount))
Expect(newStatus.DbSize).Should(BeNumerically("<", oldStatus.DbSize))
swapnilgm marked this conversation as resolved.
Show resolved Hide resolved
Expect(newStatus.Header.GetRevision()).Should(BeNumerically("==", oldStatus.Header.GetRevision()))
})
})
})
2 changes: 1 addition & 1 deletion pkg/initializer/initializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (e *EtcdInitializer) Initialize() error {
return fmt.Errorf("error while restoring corrupt data: %v", err)
}
}
return err
return nil
}

//NewInitializer creates an etcd initializer object.
Expand Down
2 changes: 1 addition & 1 deletion pkg/initializer/validator/datavalidator.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (d *DataValidator) snapDir() string { return filepath.Join(d.memberDir(), "
func (d *DataValidator) backendPath() string { return filepath.Join(d.snapDir(), "db") }

//Validate performs the steps required to validate data for Etcd instance.
// The steps involed are:
// The steps involved are:
// * Check if data directory exists.
// - If data directory exists
// * Check for data directory structure.
Expand Down
126 changes: 71 additions & 55 deletions pkg/snapstore/abs_snapstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ import (
"os"
"path"
"sort"
"strings"
"time"
"sync"

"github.com/Azure/azure-sdk-for-go/storage"
"github.com/sirupsen/logrus"
Expand All @@ -36,13 +35,15 @@ const (

// ABSSnapStore is an ABS backed snapstore.
type ABSSnapStore struct {
SnapStore
prefix string
container *storage.Container
// maxParallelChunkUploads hold the maximum number of parallel chunk uploads allowed.
maxParallelChunkUploads int
tempDir string
}

// NewABSSnapStore create new ABSSnapStore from shared configuration with specified bucket
func NewABSSnapStore(container, prefix string) (*ABSSnapStore, error) {
func NewABSSnapStore(container, prefix, tempDir string, maxParallelChunkUploads int) (*ABSSnapStore, error) {
storageAccount, err := GetEnvVarOrError(absStorageAccount)
if err != nil {
return nil, err
Expand All @@ -58,11 +59,11 @@ func NewABSSnapStore(container, prefix string) (*ABSSnapStore, error) {
return nil, fmt.Errorf("create ABS client failed: %v", err)
}

return GetSnapstoreFromClient(container, prefix, &client)
return GetSnapstoreFromClient(container, prefix, tempDir, maxParallelChunkUploads, &client)
}

// GetSnapstoreFromClient returns a new ABS object for a given container using the supplied storageClient
func GetSnapstoreFromClient(container, prefix string, storageClient *storage.Client) (*ABSSnapStore, error) {
func GetSnapstoreFromClient(container, prefix, tempDir string, maxParallelChunkUploads int, storageClient *storage.Client) (*ABSSnapStore, error) {
client := storageClient.GetBlobService()

// Check if supplied container exists
Expand All @@ -77,8 +78,10 @@ func GetSnapstoreFromClient(container, prefix string, storageClient *storage.Cli
}

return &ABSSnapStore{
prefix: prefix,
container: containerRef,
prefix: prefix,
container: containerRef,
maxParallelChunkUploads: maxParallelChunkUploads,
tempDir: tempDir,
}, nil
}

Expand Down Expand Up @@ -115,7 +118,7 @@ func (a *ABSSnapStore) List() (SnapList, error) {
// Save will write the snapshot to store
func (a *ABSSnapStore) Save(snap Snapshot, r io.Reader) error {
// Save it locally
tmpfile, err := ioutil.TempFile(tmpDir, tmpBackupFilePrefix)
tmpfile, err := ioutil.TempFile(a.tempDir, tmpBackupFilePrefix)
if err != nil {
return fmt.Errorf("failed to create snapshot tempfile: %v", err)
}
Expand All @@ -129,42 +132,61 @@ func (a *ABSSnapStore) Save(snap Snapshot, r io.Reader) error {
}

var (
errCh = make(chan chunkUploadError)
chunkSize = minChunkSize
noOfChunks = size / chunkSize
)
if size%chunkSize != 0 {
noOfChunks++
}

var (
chunkUploadCh = make(chan chunk, noOfChunks)
resCh = make(chan chunkUploadResult, noOfChunks)
wg sync.WaitGroup
cancelCh = make(chan struct{})
)

for i := 0; i < a.maxParallelChunkUploads; i++ {
wg.Add(1)
go a.blockUploader(&wg, cancelCh, &snap, tmpfile, chunkUploadCh, resCh)
}
logrus.Infof("Uploading snapshot of size: %d, chunkSize: %d, noOfChunks: %d", size, chunkSize, noOfChunks)
for offset := int64(0); offset <= size; offset += int64(chunkSize) {
go retryBlockUpload(a, &snap, tmpfile, offset, chunkSize, errCh)
}

snapshotErr := collectChunkUploadError(errCh, noOfChunks)
if len(snapshotErr) == 0 {
logrus.Info("All chunk uploaded successfully. Uploading blocklist.")
blobName := path.Join(a.prefix, snap.SnapDir, snap.SnapName)
blob := a.container.GetBlobReference(blobName)
var blockList []storage.Block
for partNumber := int64(1); partNumber <= noOfChunks; partNumber++ {
block := storage.Block{
ID: base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf("%010d", partNumber))),
Status: storage.BlockStatusUncommitted,
}
blockList = append(blockList, block)
for offset, index := int64(0), 1; offset <= size; offset += int64(chunkSize) {
newChunk := chunk{
offset: offset,
size: chunkSize,
id: index,
}
chunkUploadCh <- newChunk
index++
}
logrus.Infof("Triggered chunk upload for all chunks, total: %d", noOfChunks)

snapshotErr := collectChunkUploadError(chunkUploadCh, resCh, cancelCh, noOfChunks)
wg.Wait()

if snapshotErr != nil {
return fmt.Errorf("failed uploading chunk, id: %d, offset: %d, error: %v", snapshotErr.chunk.id, snapshotErr.chunk.offset, snapshotErr.err)
}
logrus.Info("All chunk uploaded successfully. Uploading blocklist.")
blobName := path.Join(a.prefix, snap.SnapDir, snap.SnapName)
blob := a.container.GetBlobReference(blobName)
var blockList []storage.Block
for partNumber := int64(1); partNumber <= noOfChunks; partNumber++ {
block := storage.Block{
ID: base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf("%010d", partNumber))),
Status: storage.BlockStatusUncommitted,
}
return blob.PutBlockList(blockList, &storage.PutBlockListOptions{})
blockList = append(blockList, block)
}
var collectedErr []string
for _, chunkErr := range snapshotErr {
collectedErr = append(collectedErr, fmt.Sprintf("failed uploading chunk with offset %d with error %v", chunkErr.offset, chunkErr.err))
if err := blob.PutBlockList(blockList, &storage.PutBlockListOptions{}); err != nil {
return fmt.Errorf("failed uploading blocklist for snapshot with error: %v", err)
}
return fmt.Errorf(strings.Join(collectedErr, "\n"))
logrus.Info("Blocklist uploaded successfully.")
return nil
}

func uploadBlock(s *ABSSnapStore, snap *Snapshot, file *os.File, offset, chunkSize int64) error {
func (a *ABSSnapStore) uploadBlock(snap *Snapshot, file *os.File, offset, chunkSize int64) error {
fileInfo, err := file.Stat()
if err != nil {
return err
Expand All @@ -176,38 +198,32 @@ func uploadBlock(s *ABSSnapStore, snap *Snapshot, file *os.File, offset, chunkSi
}

sr := io.NewSectionReader(file, offset, chunkSize)
blobName := path.Join(s.prefix, snap.SnapDir, snap.SnapName)
blob := s.container.GetBlobReference(blobName)
blobName := path.Join(a.prefix, snap.SnapDir, snap.SnapName)
blob := a.container.GetBlobReference(blobName)
partNumber := ((offset / chunkSize) + 1)
blockID := base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf("%010d", partNumber)))
err = blob.PutBlockWithLength(blockID, uint64(size), sr, &storage.PutBlockOptions{})
return err
}

func retryBlockUpload(s *ABSSnapStore, snap *Snapshot, file *os.File, offset, chunkSize int64, errCh chan<- chunkUploadError) {
var (
maxAttempts uint = 5
curAttempt uint = 1
err error
)
func (a *ABSSnapStore) blockUploader(wg *sync.WaitGroup, stopCh <-chan struct{}, snap *Snapshot, file *os.File, chunkUploadCh chan chunk, errCh chan<- chunkUploadResult) {
defer wg.Done()
for {
logrus.Infof("Uploading chunk with offset : %d, attempt: %d", offset, curAttempt)
err = uploadBlock(s, snap, file, offset, chunkSize)
logrus.Infof("For chunk upload of offset %d, err %v", offset, err)
if err == nil || curAttempt == maxAttempts {
break
select {
case <-stopCh:
return
case chunk, ok := <-chunkUploadCh:
if !ok {
return
}
logrus.Infof("Uploading chunk with offset : %d, attempt: %d", chunk.offset, chunk.attempt)
err := a.uploadBlock(snap, file, chunk.offset, chunk.size)
errCh <- chunkUploadResult{
err: err,
chunk: &chunk,
}
}
delayTime := (1 << curAttempt)
curAttempt++
logrus.Warnf("Will try to upload chunk with offset: %d at attempt %d after %d seconds", offset, curAttempt, delayTime)
time.Sleep((time.Duration)(delayTime) * time.Second)
}

errCh <- chunkUploadError{
err: err,
offset: offset,
}
return
}

// Delete should delete the snapshot file from store
Expand Down
Loading