Skip to content

Commit

Permalink
Add option to regenerate invalid seed indexes (#216)
Browse files Browse the repository at this point in the history
* Add option to regenerate invalid seed indexes

Sometimes we don't have the guarantee that a particular seed index is
always valid.
For example if a seed index is calculated and used in two different
occasions, it might happen that in the meantime some of the seed files
got corrupted or that they simply changed (if the seed is a writable
location).

One workaroud is to manually launch `verify-index` and then regenerate
the index if that check fails. However this involves additional steps
and is potentially slower because we need to verify the entire seed
before running the `extract` command.

Instead with the new option "--regenerate-invalid-seeds", Desync will
automatically regenerate the seed index if the validation step fails.

Signed-off-by: Ludovico de Nittis <[email protected]>

* Add the correct seed in the extract test

This test usually worked because "blob2.caibx" was considered before
"blob2_corrupted.cabix". However this test was expecting to use
"blob1.caibx", but that was not the case because in `readSeedDirs()` the
index file is skipped if it is what we have set in input.

Instead, we add it explicitly to force "blob1.caibx" as a seed.

Signed-off-by: Ludovico de Nittis <[email protected]>
  • Loading branch information
RyuzakiKK authored Apr 3, 2022
1 parent 16ddd74 commit 47914b8
Show file tree
Hide file tree
Showing 10 changed files with 116 additions and 15 deletions.
23 changes: 18 additions & 5 deletions assemble.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@ import (
)

// InvalidSeedAction represent the action that we will take if a seed
// happens to be invalid. There are currently two options: either fail with
// an error or skip the invalid seed and try to continue.
// happens to be invalid. There are currently three options:
// - fail with an error
// - skip the invalid seed and try to continue
// - regenerate the invalid seed index
type InvalidSeedAction int

const (
InvalidSeedActionBailOut InvalidSeedAction = iota
InvalidSeedActionSkip
InvalidSeedActionRegenerate
)

type AssembleOptions struct {
Expand Down Expand Up @@ -225,11 +228,21 @@ func AssembleFile(ctx context.Context, name string, idx Index, s Store, seeds []
for {
if err := plan.Validate(ctx, options.N); err != nil {
// This plan has at least one invalid seed
if options.InvalidSeedAction == InvalidSeedActionBailOut {
switch options.InvalidSeedAction {
case InvalidSeedActionBailOut:
return stats, err
case InvalidSeedActionRegenerate:
Log.WithError(err).Info("Unable to use one of the chosen seeds, regenerating it")
if err := seq.RegenerateInvalidSeeds(ctx, options.N); err != nil {
return stats, err
}
case InvalidSeedActionSkip:
// Recreate the plan. This time the seed marked as invalid will be skipped
Log.WithError(err).Info("Unable to use one of the chosen seeds, skipping it")
default:
panic("Unhandled InvalidSeedAction")
}
// Skip the invalid seed and try again
Log.WithError(err).Info("Unable to use one of the chosen seeds, skipping it")

seq.Rewind()
plan = seq.Plan()
continue
Expand Down
30 changes: 21 additions & 9 deletions cmd/desync/extract.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,14 @@ import (

type extractOptions struct {
cmdStoreOptions
stores []string
cache string
seeds []string
seedDirs []string
inPlace bool
printStats bool
skipInvalidSeeds bool
stores []string
cache string
seeds []string
seedDirs []string
inPlace bool
printStats bool
skipInvalidSeeds bool
regenerateInvalidSeeds bool
}

func newExtractCommand(ctx context.Context) *cobra.Command {
Expand All @@ -33,11 +34,15 @@ func newExtractCommand(ctx context.Context) *cobra.Command {
When using -k, the blob will be extracted in-place utilizing existing data and
the target file will not be deleted on error. This can be used to restart a
failed prior extraction without having to retrieve completed chunks again.
Muptiple optional seed indexes can be given with -seed. The matching blob needs
Multiple optional seed indexes can be given with -seed. The matching blob needs
to have the same name as the indexfile without the .caibx extension. If several
seed files and indexes are available, the -seed-dir option can be used to
automatically select call .caibx files in a directory as seeds. Use '-' to read
the index from STDIN.`,
the index from STDIN. If a seed is invalid, by default the extract operation will be
aborted. With the -skip-invalid-seeds, the invalid seeds will be discarded and the
extraction will continue without them. Otherwise with the -regenerate-invalid-seeds,
the eventual invalid seed indexes will be regenerated, in memory, by using the
available data, and neither data nor indexes will be changed on disk.`,
Example: ` desync extract -s http://192.168.1.1/ -c /path/to/local file.caibx largefile.bin
desync extract -s /mnt/store -s /tmp/other/store file.tar.caibx file.tar
desync extract -s /mnt/store --seed /mnt/v1.caibx v2.caibx v2.vmdk`,
Expand All @@ -52,6 +57,7 @@ the index from STDIN.`,
flags.StringSliceVar(&opt.seeds, "seed", nil, "seed indexes")
flags.StringSliceVar(&opt.seedDirs, "seed-dir", nil, "directory with seed index files")
flags.BoolVar(&opt.skipInvalidSeeds, "skip-invalid-seeds", false, "Skip seeds with invalid chunks")
flags.BoolVar(&opt.regenerateInvalidSeeds, "regenerate-invalid-seeds", false, "Regenerate seed indexes with invalid chunks")
flags.StringVarP(&opt.cache, "cache", "c", "", "store to be used as cache")
flags.BoolVarP(&opt.inPlace, "in-place", "k", false, "extract the file in place and keep it in case of error")
flags.BoolVarP(&opt.printStats, "print-stats", "", false, "print statistics")
Expand All @@ -75,6 +81,10 @@ func runExtract(ctx context.Context, opt extractOptions, args []string) error {
return errors.New("no store provided")
}

if opt.skipInvalidSeeds && opt.regenerateInvalidSeeds {
return errors.New("is not possible to use at the same time --skip-invalid-seeds and --regenerate-invalid-seeds")
}

// Parse the store locations, open the stores and add a cache is requested
var s desync.Store
s, err := MultiStoreWithCache(opt.cmdStoreOptions, opt.cache, opt.stores...)
Expand Down Expand Up @@ -106,6 +116,8 @@ func runExtract(ctx context.Context, opt extractOptions, args []string) error {
invalidSeedAction := desync.InvalidSeedActionBailOut
if opt.skipInvalidSeeds {
invalidSeedAction = desync.InvalidSeedActionSkip
} else if opt.regenerateInvalidSeeds {
invalidSeedAction = desync.InvalidSeedActionRegenerate
}
assembleOpt := desync.AssembleOptions{N: opt.n, InvalidSeedAction: invalidSeedAction}

Expand Down
18 changes: 17 additions & 1 deletion cmd/desync/extract_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,23 @@ func TestExtractCommand(t *testing.T) {
[]string{"--store", "testdata/empty.store", "--seed", "testdata/blob2_corrupted.caibx", "--seed", "testdata/blob1.caibx", "--skip-invalid-seeds", "testdata/blob1.caibx"}, out1},
// Here we don't need the `--skip-invalid-seeds` because we expect the blob1 seed to always be the chosen one, being
// a 1:1 match with the index that we want to write. So we never reach the point where we validate the corrupted seed.
// Explicitly set blob1 seed because seed-dir skips a seed if it's the same index file we gave in input.
{"extract with seed directory without skipping invalid seeds",
[]string{"-s", "testdata/blob1.store", "--seed-dir", "testdata", "testdata/blob1.caibx"}, out1},
[]string{"-s", "testdata/blob1.store", "--seed-dir", "testdata", "--seed", "testdata/blob1.caibx", "testdata/blob1.caibx"}, out1},
// Same as above, no need for `--skip-invalid-seeds`
{"extract with multiple corrupted seeds",
[]string{"--store", "testdata/empty.store", "--seed", "testdata/blob2_corrupted.caibx", "--seed", "testdata/blob1.caibx", "testdata/blob1.caibx"}, out1},
{"extract with single seed that has all the expected chunks",
[]string{"--store", "testdata/empty.store", "--seed", "testdata/blob1.caibx", "testdata/blob1.caibx"}, out1},
// blob2_corrupted is a corrupted blob that doesn't match its seed index. We regenerate the seed index to match
// this corrupted blob
{"extract while regenerating the corrupted seed",
[]string{"--store", "testdata/blob1.store", "--seed", "testdata/blob2_corrupted.caibx", "--regenerate-invalid-seeds", "testdata/blob1.caibx"}, out1},
// blob1_corrupted_index.caibx is a corrupted seed index that points to a valid blob1 file. By regenerating the
// invalid seed we expect to have an index that is equal to blob1.caibx. That should be enough to do the
// extraction without taking chunks from the store
{"extract with corrupted seed and empty store",
[]string{"--store", "testdata/empty.store", "--seed", "testdata/blob1_corrupted_index.caibx", "--regenerate-invalid-seeds", "testdata/blob1.caibx"}, out1},
} {
t.Run(test.name, func(t *testing.T) {
cmd := newExtractCommand(context.Background())
Expand Down Expand Up @@ -125,6 +135,12 @@ func TestExtractWithInvalidSeeds(t *testing.T) {
[]string{"--store", "testdata/blob1.store", "--seed", "testdata/blob2_corrupted.caibx", "testdata/blob1.caibx"}, out},
{"extract with multiple corrupted seeds",
[]string{"--store", "testdata/empty.store", "--seed", "testdata/blob2_corrupted.caibx", "--seed", "testdata/blob1.caibx", "testdata/blob2.caibx"}, out},
{"extract with corrupted blob1 seed and a valid seed",
[]string{"--store", "testdata/blob2.store", "--seed", "testdata/blob1_corrupted_index.caibx", "--seed", "testdata/blob1.caibx", "testdata/blob2.caibx"}, out},
{"extract with corrupted blob1 seed",
[]string{"--store", "testdata/blob2.store", "--seed", "testdata/blob1_corrupted_index.caibx", "testdata/blob2.caibx"}, out},
{"extract with both --regenerate-invalid-seed and --skip-invalid-seeds",
[]string{"--store", "testdata/blob1.store", "--seed", "testdata/blob1_corrupted_index.caibx", "--regenerate-invalid-seeds", "--skip-invalid-seeds", "testdata/blob1.caibx"}, out},
} {
t.Run(test.name, func(t *testing.T) {
cmd := newExtractCommand(context.Background())
Expand Down
1 change: 1 addition & 0 deletions cmd/desync/testdata/blob1_corrupted_index
Binary file added cmd/desync/testdata/blob1_corrupted_index.caibx
Binary file not shown.
24 changes: 24 additions & 0 deletions fileseed.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package desync

import (
"context"
"fmt"
"io"
"os"
Expand Down Expand Up @@ -63,12 +64,35 @@ func (s *FileSeed) LongestMatchWith(chunks []IndexChunk) (int, SeedSegment) {
return max, newFileSeedSegment(s.srcFile, match, s.canReflink)
}

func (s *FileSeed) RegenerateIndex(ctx context.Context, n int) error {
index, _, err := IndexFromFile(ctx, s.srcFile, n, s.index.Index.ChunkSizeMin, s.index.Index.ChunkSizeAvg,
s.index.Index.ChunkSizeMax, nil)
if err != nil {
return err
}

s.index = index
s.SetInvalid(false)
s.pos = make(map[ChunkID][]int, len(s.index.Chunks))
for i, c := range s.index.Chunks {
s.pos[c.ID] = append(s.pos[c.ID], i)
}

return nil
}

func (s *FileSeed) SetInvalid(value bool) {
s.mu.Lock()
defer s.mu.Unlock()
s.isInvalid = value
}

func (s *FileSeed) IsInvalid() bool {
s.mu.Lock()
defer s.mu.Unlock()
return s.isInvalid
}

// Returns a slice of chunks from the seed. Compares chunks from position 0
// with seed chunks starting at p.
func (s *FileSeed) maxMatchFrom(chunks []IndexChunk, p int) []IndexChunk {
Expand Down
10 changes: 10 additions & 0 deletions nullseed.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package desync

import (
"context"
"fmt"
"io"
"io/ioutil"
Expand Down Expand Up @@ -64,10 +65,19 @@ func (s *nullChunkSeed) LongestMatchWith(chunks []IndexChunk) (int, SeedSegment)
}
}

func (s *nullChunkSeed) RegenerateIndex(ctx context.Context, n int) error {
panic("A nullseed can't be regenerated")
}

func (s *nullChunkSeed) SetInvalid(value bool) {
panic("A nullseed is never expected to be invalid")
}

func (s *nullChunkSeed) IsInvalid() bool {
// A nullseed is never expected to be invalid
return false
}

type nullChunkSection struct {
from, to uint64
blockfile *os.File
Expand Down
3 changes: 3 additions & 0 deletions seed.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package desync

import (
"context"
"os"
)

Expand All @@ -12,7 +13,9 @@ const DefaultBlockSize = 4096
// existing chunks or blocks into the target from.
type Seed interface {
LongestMatchWith(chunks []IndexChunk) (int, SeedSegment)
RegenerateIndex(ctx context.Context, n int) error
SetInvalid(value bool)
IsInvalid() bool
}

// SeedSegment represents a matching range between a Seed and a file being
Expand Down
10 changes: 10 additions & 0 deletions selfseed.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package desync

import (
"context"
"sync"
)

Expand Down Expand Up @@ -78,6 +79,15 @@ func (s *selfSeed) getChunk(id ChunkID) SeedSegment {
return newFileSeedSegment(s.file, s.index.Chunks[first:first+1], s.canReflink)
}

func (s *selfSeed) RegenerateIndex(ctx context.Context, n int) error {
panic("A selfSeed can't be regenerated")
}

func (s *selfSeed) SetInvalid(value bool) {
panic("A selfSeed is never expected to be invalid")
}

func (s *selfSeed) IsInvalid() bool {
// A selfSeed is never expected to be invalid
return false
}
12 changes: 12 additions & 0 deletions sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,18 @@ func (r *SeedSequencer) Rewind() {
r.current = 0
}

// RegenerateInvalidSeeds regenerates the index to match the unexpected seed content
func (r *SeedSequencer) RegenerateInvalidSeeds(ctx context.Context, n int) error {
for _, s := range r.seeds {
if s.IsInvalid() {
if err := s.RegenerateIndex(ctx, n); err != nil {
return err
}
}
}
return nil
}

// Validate validates a proposed plan by checking if all the chosen chunks
// are correctly provided from the seeds. In case a seed has invalid chunks, the
// entire seed is marked as invalid and an error is returned.
Expand Down

0 comments on commit 47914b8

Please sign in to comment.