Skip to content

Commit

Permalink
Merge pull request #3712 from ipfs/kevina/more-robust-gc
Browse files Browse the repository at this point in the history
More Robust GC
  • Loading branch information
whyrusleeping authored Mar 22, 2017
2 parents d065593 + d39d9ed commit 21072a5
Show file tree
Hide file tree
Showing 6 changed files with 365 additions and 89 deletions.
3 changes: 3 additions & 0 deletions commands/channelmarshaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ func (cr *ChannelMarshaler) Read(p []byte) (int, error) {
if err != nil {
return 0, err
}
if r == nil {
return 0, nil
}
cr.reader = r
}

Expand Down
55 changes: 41 additions & 14 deletions core/commands/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
fsrepo "github.com/ipfs/go-ipfs/repo/fsrepo"
lockfile "github.com/ipfs/go-ipfs/repo/fsrepo/lock"

cid "gx/ipfs/QmV5gPoRsjN1Gid3LMdNZTyfCtP2DsvqEbMAmz82RmmiGk/go-cid"
u "gx/ipfs/QmZuY8aV7zbNXVy6DyN9SmnuH3o9nG852F4aTiSBpts8d1/go-ipfs-util"
)

Expand All @@ -39,6 +40,12 @@ var RepoCmd = &cmds.Command{
},
}

// GcResult is the result returned by "repo gc" command.
type GcResult struct {
Key *cid.Cid
Error string `json:",omitempty"`
}

var repoGcCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Perform a garbage collection sweep on the repo.",
Expand All @@ -50,6 +57,7 @@ order to reclaim hard disk space.
},
Options: []cmds.Option{
cmds.BoolOption("quiet", "q", "Write minimal output.").Default(false),
cmds.BoolOption("stream-errors", "Stream errors.").Default(false),
},
Run: func(req cmds.Request, res cmds.Response) {
n, err := req.InvocContext().GetNode()
Expand All @@ -58,23 +66,39 @@ order to reclaim hard disk space.
return
}

gcOutChan, err := corerepo.GarbageCollectAsync(n, req.Context())
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
streamErrors, _, _ := res.Request().Option("stream-errors").Bool()

gcOutChan := corerepo.GarbageCollectAsync(n, req.Context())

outChan := make(chan interface{})
outChan := make(chan interface{}, cap(gcOutChan))
res.SetOutput((<-chan interface{})(outChan))

go func() {
defer close(outChan)
for k := range gcOutChan {
outChan <- k
if streamErrors {
errs := false
for res := range gcOutChan {
if res.Error != nil {
outChan <- &GcResult{Error: res.Error.Error()}
errs = true
} else {
outChan <- &GcResult{Key: res.KeyRemoved}
}
}
if errs {
res.SetError(fmt.Errorf("encountered errors during gc run"), cmds.ErrNormal)
}
} else {
err := corerepo.CollectResult(req.Context(), gcOutChan, func(k *cid.Cid) {
outChan <- &GcResult{Key: k}
})
if err != nil {
res.SetError(err, cmds.ErrNormal)
}
}
}()
},
Type: corerepo.KeyRemoved{},
Type: GcResult{},
Marshalers: cmds.MarshalerMap{
cmds.Text: func(res cmds.Response) (io.Reader, error) {
outChan, ok := res.Output().(<-chan interface{})
Expand All @@ -88,18 +112,21 @@ order to reclaim hard disk space.
}

marshal := func(v interface{}) (io.Reader, error) {
obj, ok := v.(*corerepo.KeyRemoved)
obj, ok := v.(*GcResult)
if !ok {
return nil, u.ErrCast()
}

buf := new(bytes.Buffer)
if obj.Error != "" {
fmt.Fprintf(res.Stderr(), "Error: %s\n", obj.Error)
return nil, nil
}

if quiet {
buf = bytes.NewBufferString(obj.Key.String() + "\n")
return bytes.NewBufferString(obj.Key.String() + "\n"), nil
} else {
buf = bytes.NewBufferString(fmt.Sprintf("removed %s\n", obj.Key))
return bytes.NewBufferString(fmt.Sprintf("removed %s\n", obj.Key)), nil
}
return buf, nil
}

return &cmds.ChannelMarshaler{
Expand Down
84 changes: 55 additions & 29 deletions core/corerepo/gc.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package corerepo

import (
"bytes"
"context"
"errors"
"time"
Expand All @@ -19,10 +20,6 @@ var log = logging.Logger("corerepo")

var ErrMaxStorageExceeded = errors.New("Maximum storage limit exceeded. Maybe unpin some files?")

type KeyRemoved struct {
Key *cid.Cid
}

type GC struct {
Node *core.IpfsNode
Repo repo.Repo
Expand Down Expand Up @@ -89,46 +86,75 @@ func GarbageCollect(n *core.IpfsNode, ctx context.Context) error {
if err != nil {
return err
}
rmed, err := gc.GC(ctx, n.Blockstore, n.DAG, n.Pinning, roots)
if err != nil {
return err
}
rmed := gc.GC(ctx, n.Blockstore, n.DAG, n.Pinning, roots)

return CollectResult(ctx, rmed, nil)
}

// CollectResult collects the output of a garbage collection run and calls the
// given callback for each object removed. It also collects all errors into a
// MultiError which is returned after the gc is completed.
func CollectResult(ctx context.Context, gcOut <-chan gc.Result, cb func(*cid.Cid)) error {
var errors []error
loop:
for {
select {
case _, ok := <-rmed:
case res, ok := <-gcOut:
if !ok {
return nil
break loop
}
if res.Error != nil {
errors = append(errors, res.Error)
} else if res.KeyRemoved != nil && cb != nil {
cb(res.KeyRemoved)
}
case <-ctx.Done():
return ctx.Err()
errors = append(errors, ctx.Err())
break loop
}
}

switch len(errors) {
case 0:
return nil
case 1:
return errors[0]
default:
return NewMultiError(errors...)
}
}

func GarbageCollectAsync(n *core.IpfsNode, ctx context.Context) (<-chan *KeyRemoved, error) {
roots, err := BestEffortRoots(n.FilesRoot)
if err != nil {
return nil, err
// NewMultiError creates a new MultiError object from a given slice of errors.
func NewMultiError(errs ...error) *MultiError {
return &MultiError{errs[:len(errs)-1], errs[len(errs)-1]}
}

// MultiError contains the results of multiple errors.
type MultiError struct {
Errors []error
Summary error
}

func (e *MultiError) Error() string {
var buf bytes.Buffer
for _, err := range e.Errors {
buf.WriteString(err.Error())
buf.WriteString("; ")
}
rmed, err := gc.GC(ctx, n.Blockstore, n.DAG, n.Pinning, roots)
buf.WriteString(e.Summary.Error())
return buf.String()
}

func GarbageCollectAsync(n *core.IpfsNode, ctx context.Context) <-chan gc.Result {
roots, err := BestEffortRoots(n.FilesRoot)
if err != nil {
return nil, err
out := make(chan gc.Result)
out <- gc.Result{Error: err}
close(out)
return out
}

out := make(chan *KeyRemoved)
go func() {
defer close(out)
for k := range rmed {
select {
case out <- &KeyRemoved{k}:
case <-ctx.Done():
return
}
}
}()
return out, nil
return gc.GC(ctx, n.Blockstore, n.DAG, n.Pinning, roots)
}

func PeriodicGC(ctx context.Context, node *core.IpfsNode) error {
Expand Down
25 changes: 7 additions & 18 deletions core/coreunix/add_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ func TestAddGCLive(t *testing.T) {
t.Fatal(err)
}

errs := make(chan error)
out := make(chan interface{})
adder, err := NewAdder(context.Background(), node.Pinning, node.Blockstore, node.DAG)
if err != nil {
Expand Down Expand Up @@ -99,18 +98,11 @@ func TestAddGCLive(t *testing.T) {
t.Fatal("add shouldnt complete yet")
}

var gcout <-chan *cid.Cid
var gcout <-chan gc.Result
gcstarted := make(chan struct{})
go func() {
defer close(gcstarted)
gcchan, err := gc.GC(context.Background(), node.Blockstore, node.DAG, node.Pinning, nil)
if err != nil {
log.Error("GC ERROR:", err)
errs <- err
return
}

gcout = gcchan
gcout = gc.GC(context.Background(), node.Blockstore, node.DAG, node.Pinning, nil)
}()

// gc shouldnt start until we let the add finish its current file.
Expand All @@ -119,8 +111,6 @@ func TestAddGCLive(t *testing.T) {
select {
case <-gcstarted:
t.Fatal("gc shouldnt have started yet")
case err := <-errs:
t.Fatal(err)
default:
}

Expand All @@ -133,18 +123,17 @@ func TestAddGCLive(t *testing.T) {
select {
case o := <-out:
addedHashes[o.(*AddedObject).Hash] = struct{}{}
case err := <-errs:
t.Fatal(err)
}

select {
case <-gcstarted:
case err := <-errs:
t.Fatal(err)
}

for k := range gcout {
if _, ok := addedHashes[k.String()]; ok {
for r := range gcout {
if r.Error != nil {
t.Fatal(err)
}
if _, ok := addedHashes[r.KeyRemoved.String()]; ok {
t.Fatal("gc'ed a hash we just added")
}
}
Expand Down
Loading

0 comments on commit 21072a5

Please sign in to comment.