diff --git a/task/copy.go b/task/copy.go index a84d9f4..2c02d66 100644 --- a/task/copy.go +++ b/task/copy.go @@ -3,9 +3,9 @@ package task import ( "context" "fmt" + "io" ps "github.com/beyondstorage/go-storage/v4/pairs" - "github.com/beyondstorage/go-storage/v4/pkg/iowrap" "github.com/beyondstorage/go-storage/v4/types" protobuf "github.com/golang/protobuf/proto" "go.uber.org/zap" @@ -150,9 +150,15 @@ func (rn *runner) HandleCopySingleFile(ctx context.Context, msg protobuf.Message src := rn.storages[arg.Src] dst := rn.storages[arg.Dst] - r, w := iowrap.Pipe() + r, w := io.Pipe() go func() { + defer func() { + err := w.Close() + if err != nil { + logger.Error("close pipe writer", zap.Error(err)) + } + }() _, err := src.Read(arg.SrcPath, w) if err != nil { logger.Error("src read failed", zap.Error(err)) @@ -165,13 +171,6 @@ func (rn *runner) HandleCopySingleFile(ctx context.Context, msg protobuf.Message return err } - defer func() { - err = r.Close() - if err != nil { - return - } - }() - logger.Info("copy single file", zap.String("from", arg.SrcPath), zap.String("to", arg.DstPath)) @@ -293,9 +292,15 @@ func (rn *runner) HandleCopyMultipart(ctx context.Context, msg protobuf.Message) return fmt.Errorf("not supported") } - r, w := iowrap.Pipe() + r, w := io.Pipe() go func() { + defer func() { + err := w.Close() + if err != nil { + logger.Error("close pipe writer", zap.Error(err)) + } + }() _, err := src.Read(arg.SrcPath, w, ps.WithSize(arg.Size), ps.WithOffset(arg.Offset)) if err != nil { logger.Error("src read", @@ -311,13 +316,6 @@ func (rn *runner) HandleCopyMultipart(ctx context.Context, msg protobuf.Message) return err } - defer func() { - err = r.Close() - if err != nil { - return - } - }() - result, _ := protobuf.Marshal(&models.WriteMultipartJobMetadata{ Etag: part.ETag, })