Skip to content

Commit

Permalink
Merge pull request restic#2941 from MichaelEischer/parallel-repack
Browse files Browse the repository at this point in the history
prune: Parallelize repack step
  • Loading branch information
fd0 authored Nov 5, 2020
2 parents 8a0dbe7 + ae5302c commit 636b2f2
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 66 deletions.
8 changes: 8 additions & 0 deletions changelog/unreleased/pull-2941
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
Enhancement: Speed up repacking step of prune command

The repack step of the prune command, which moves still used file parts into
new pack files such that the old ones can be garbage collected later on, now
processes multiple pack files in parallel. This is especially beneficial for
high latency backends or when using a fast network connection.

https://github.com/restic/restic/pull/2941
202 changes: 140 additions & 62 deletions internal/repository/repack.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,26 @@ package repository

import (
"context"
"os"
"sync"

"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/fs"
"github.com/restic/restic/internal/pack"
"github.com/restic/restic/internal/restic"
"golang.org/x/sync/errgroup"
)

const numRepackWorkers = 8

// Repack takes a list of packs together with a list of blobs contained in
// these packs. Each pack is loaded and the blobs listed in keepBlobs is saved
// into a new pack. Returned is the list of obsolete packs which can then
// be removed.
//
// The map keepBlobs is modified by Repack, it is used to keep track of which
// blobs have been processed.
func Repack(ctx context.Context, repo restic.Repository, packs restic.IDSet, keepBlobs restic.BlobSet, p *restic.Progress) (obsoletePacks restic.IDSet, err error) {
if p != nil {
p.Start()
Expand All @@ -22,91 +30,161 @@ func Repack(ctx context.Context, repo restic.Repository, packs restic.IDSet, kee

debug.Log("repacking %d packs while keeping %d blobs", len(packs), len(keepBlobs))

for packID := range packs {
// load the complete pack into a temp file
h := restic.Handle{Type: restic.PackFile, Name: packID.String()}
wg, ctx := errgroup.WithContext(ctx)

tempfile, hash, packLength, err := DownloadAndHash(ctx, repo.Backend(), h)
if err != nil {
return nil, errors.Wrap(err, "Repack")
downloadQueue := make(chan restic.ID)
wg.Go(func() error {
defer close(downloadQueue)
for packID := range packs {
select {
case downloadQueue <- packID:
case <-ctx.Done():
return ctx.Err()
}
}
return nil
})

debug.Log("pack %v loaded (%d bytes), hash %v", packID, packLength, hash)
type repackJob struct {
tempfile *os.File
hash restic.ID
packLength int64
}
processQueue := make(chan repackJob)
// used to close processQueue once all downloaders have finished
var downloadWG sync.WaitGroup

if !packID.Equal(hash) {
return nil, errors.Errorf("hash does not match id: want %v, got %v", packID, hash)
}
downloader := func() error {
defer downloadWG.Done()
for packID := range downloadQueue {
// load the complete pack into a temp file
h := restic.Handle{Type: restic.PackFile, Name: packID.String()}

_, err = tempfile.Seek(0, 0)
if err != nil {
return nil, errors.Wrap(err, "Seek")
}
tempfile, hash, packLength, err := DownloadAndHash(ctx, repo.Backend(), h)
if err != nil {
return errors.Wrap(err, "Repack")
}

blobs, err := pack.List(repo.Key(), tempfile, packLength)
if err != nil {
return nil, err
}
debug.Log("pack %v loaded (%d bytes), hash %v", packID, packLength, hash)

debug.Log("processing pack %v, blobs: %v", packID, len(blobs))
var buf []byte
for _, entry := range blobs {
h := restic.BlobHandle{ID: entry.ID, Type: entry.Type}
if !keepBlobs.Has(h) {
continue
if !packID.Equal(hash) {
return errors.Errorf("hash does not match id: want %v, got %v", packID, hash)
}

debug.Log(" process blob %v", h)

if uint(cap(buf)) < entry.Length {
buf = make([]byte, entry.Length)
select {
case processQueue <- repackJob{tempfile, hash, packLength}:
case <-ctx.Done():
return ctx.Err()
}
buf = buf[:entry.Length]
}
return nil
}

n, err := tempfile.ReadAt(buf, int64(entry.Offset))
downloadWG.Add(numRepackWorkers)
for i := 0; i < numRepackWorkers; i++ {
wg.Go(downloader)
}
wg.Go(func() error {
downloadWG.Wait()
close(processQueue)
return nil
})

var keepMutex sync.Mutex
worker := func() error {
for job := range processQueue {
tempfile, packID, packLength := job.tempfile, job.hash, job.packLength

blobs, err := pack.List(repo.Key(), tempfile, packLength)
if err != nil {
return nil, errors.Wrap(err, "ReadAt")
return err
}

if n != len(buf) {
return nil, errors.Errorf("read blob %v from %v: not enough bytes read, want %v, got %v",
h, tempfile.Name(), len(buf), n)
debug.Log("processing pack %v, blobs: %v", packID, len(blobs))
var buf []byte
for _, entry := range blobs {
h := restic.BlobHandle{ID: entry.ID, Type: entry.Type}

keepMutex.Lock()
shouldKeep := keepBlobs.Has(h)
keepMutex.Unlock()

if !shouldKeep {
continue
}

debug.Log(" process blob %v", h)

if uint(cap(buf)) < entry.Length {
buf = make([]byte, entry.Length)
}
buf = buf[:entry.Length]

n, err := tempfile.ReadAt(buf, int64(entry.Offset))
if err != nil {
return errors.Wrap(err, "ReadAt")
}

if n != len(buf) {
return errors.Errorf("read blob %v from %v: not enough bytes read, want %v, got %v",
h, tempfile.Name(), len(buf), n)
}

nonce, ciphertext := buf[:repo.Key().NonceSize()], buf[repo.Key().NonceSize():]
plaintext, err := repo.Key().Open(ciphertext[:0], nonce, ciphertext, nil)
if err != nil {
return err
}

id := restic.Hash(plaintext)
if !id.Equal(entry.ID) {
debug.Log("read blob %v/%v from %v: wrong data returned, hash is %v",
h.Type, h.ID, tempfile.Name(), id)
return errors.Errorf("read blob %v from %v: wrong data returned, hash is %v",
h, tempfile.Name(), id)
}

keepMutex.Lock()
// recheck whether some other worker was faster
shouldKeep = keepBlobs.Has(h)
if shouldKeep {
keepBlobs.Delete(h)
}
keepMutex.Unlock()

if !shouldKeep {
continue
}

// We do want to save already saved blobs!
_, _, err = repo.SaveBlob(ctx, entry.Type, plaintext, entry.ID, true)
if err != nil {
return err
}

debug.Log(" saved blob %v", entry.ID)
}

nonce, ciphertext := buf[:repo.Key().NonceSize()], buf[repo.Key().NonceSize():]
plaintext, err := repo.Key().Open(ciphertext[:0], nonce, ciphertext, nil)
if err != nil {
return nil, err
if err = tempfile.Close(); err != nil {
return errors.Wrap(err, "Close")
}

id := restic.Hash(plaintext)
if !id.Equal(entry.ID) {
debug.Log("read blob %v/%v from %v: wrong data returned, hash is %v",
h.Type, h.ID, tempfile.Name(), id)
return nil, errors.Errorf("read blob %v from %v: wrong data returned, hash is %v",
h, tempfile.Name(), id)
if err = fs.RemoveIfExists(tempfile.Name()); err != nil {
return errors.Wrap(err, "Remove")
}

// We do want to save already saved blobs!
_, _, err = repo.SaveBlob(ctx, entry.Type, plaintext, entry.ID, true)
if err != nil {
return nil, err
if p != nil {
p.Report(restic.Stat{Blobs: 1})
}

debug.Log(" saved blob %v", entry.ID)

keepBlobs.Delete(h)
}
return nil
}

if err = tempfile.Close(); err != nil {
return nil, errors.Wrap(err, "Close")
}
for i := 0; i < numRepackWorkers; i++ {
wg.Go(worker)
}

if err = fs.RemoveIfExists(tempfile.Name()); err != nil {
return nil, errors.Wrap(err, "Remove")
}
if p != nil {
p.Report(restic.Stat{Blobs: 1})
}
if err := wg.Wait(); err != nil {
return nil, err
}

if err := repo.Flush(ctx); err != nil {
Expand Down
7 changes: 4 additions & 3 deletions internal/repository/repack_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"math/rand"
"testing"
"time"

"github.com/restic/restic/internal/index"
"github.com/restic/restic/internal/repository"
Expand Down Expand Up @@ -195,7 +196,7 @@ func TestRepack(t *testing.T) {
repo, cleanup := repository.TestRepository(t)
defer cleanup()

seed := rand.Int63()
seed := time.Now().UnixNano()
rand.Seed(seed)
t.Logf("rand seed is %v", seed)

Expand Down Expand Up @@ -262,7 +263,7 @@ func TestRepackWrongBlob(t *testing.T) {
repo, cleanup := repository.TestRepository(t)
defer cleanup()

seed := rand.Int63()
seed := time.Now().UnixNano()
rand.Seed(seed)
t.Logf("rand seed is %v", seed)

Expand All @@ -277,5 +278,5 @@ func TestRepackWrongBlob(t *testing.T) {
if err == nil {
t.Fatal("expected repack to fail but got no error")
}
t.Log(err)
t.Logf("found expected error: %v", err)
}
2 changes: 1 addition & 1 deletion internal/repository/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -758,7 +758,7 @@ type Loader interface {

// DownloadAndHash is all-in-one helper to download content of the file at h to a temporary filesystem location
// and calculate ID of the contents. Returned (temporary) file is positioned at the beginning of the file;
// it is reponsibility of the caller to close and delete the file.
// it is the reponsibility of the caller to close and delete the file.
func DownloadAndHash(ctx context.Context, be Loader, h restic.Handle) (tmpfile *os.File, hash restic.ID, size int64, err error) {
tmpfile, err = fs.TempFile("", "restic-temp-")
if err != nil {
Expand Down

0 comments on commit 636b2f2

Please sign in to comment.