Skip to content

Commit

Permalink
Merge pull request #123 from otiai10/feature/concurrent-dcopy
Browse files Browse the repository at this point in the history
Feature/concurrent dcopy
  • Loading branch information
otiai10 authored Oct 2, 2023
2 parents f0f65b5 + 4118a68 commit fba066a
Show file tree
Hide file tree
Showing 16 changed files with 177 additions and 16 deletions.
19 changes: 18 additions & 1 deletion .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ jobs:
os: [ubuntu-latest, macos-latest, windows-latest]
go: ['1.18', '1.19', '1.20']
steps:

- name: Set up Go
uses: actions/setup-go@v4
with:
Expand All @@ -34,3 +33,21 @@ jobs:

- name: Test
run: go test -v --tags=go${{ matrix.go }}
benchmark:
name: Benchmark
runs-on: ubuntu-latest
strategy:
matrix:
go: ['1.21']
steps:
- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: ${{ matrix.go }}
id: go
- name: Check out code into the Go module directory
uses: actions/checkout@v3
- name: Get dependencies
run: go get -v -t -d ./...
- name: Benchmark
run: go test -bench . -benchmem -benchtime 8s
15 changes: 14 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,19 @@ type Options struct {
// If given, copy.Copy refers to this fs.FS instead of the OS filesystem.
// e.g., You can use embed.FS to copy files from embedded filesystem.
FS fs.FS

// NumOfWorkers represents the number of workers used for
// concurrent copying contents of directories.
// If 0 or 1, it does not use goroutine for copying directories.
// Please refer to https://pkg.go.dev/golang.org/x/sync/semaphore for more details.
NumOfWorkers int64

// PreferConcurrent is a function to determine whether or not
// to use goroutine for copying contents of directories.
// If PreferConcurrent is nil, which is default, it does concurrent
// copying for all directories.
// If NumOfWorkers is 0 or 1, this function will be ignored.
PreferConcurrent func(srcdir, destdir string) (bool, error)
}
```

Expand All @@ -105,4 +118,4 @@ err := Copy("your/directory", "your/directory.copy", opt)


## License
[![FOSSA Status](https://app.fossa.com/api/projects/git%2Bgithub.com%2Fotiai10%2Fcopy.svg?type=large)](https://app.fossa.com/projects/git%2Bgithub.com%2Fotiai10%2Fcopy?ref=badge_large)
[![FOSSA Status](https://app.fossa.com/api/projects/git%2Bgithub.com%2Fotiai10%2Fcopy.svg?type=large)](https://app.fossa.com/projects/git%2Bgithub.com%2Fotiai10%2Fcopy?ref=badge_large)
14 changes: 14 additions & 0 deletions all_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,3 +475,17 @@ func (r *SleepyReader) Read(p []byte) (int, error) {
}
return n, e
}

func TestOptions_NumOfWorkers(t *testing.T) {
opt := Options{NumOfWorkers: 3}
err := Copy("test/data/case19", "test/data.copy/case19", opt)
Expect(t, err).ToBe(nil)
}

func TestOptions_PreferConcurrent(t *testing.T) {
opt := Options{NumOfWorkers: 4, PreferConcurrent: func(sd, dd string) (bool, error) {
return strings.HasSuffix(sd, "concurrent"), nil
}}
err := Copy("test/data/case19", "test/data.copy/case19_preferconcurrent", opt)
Expect(t, err).ToBe(nil)
}
38 changes: 38 additions & 0 deletions benchmark_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package copy

import (
"fmt"
"testing"
)

func BenchmarkOptions_NumOfWorkers_0(b *testing.B) {
var num int64 = 0 // 0 or 1 = single-threaded
opt := Options{NumOfWorkers: num}
for i := 0; i < b.N; i++ {
Copy("test/data/case19", fmt.Sprintf("test/data.copy/case19-%d-%d", num, i), opt)
}
}

func BenchmarkOptions_NumOfWorkers_2(b *testing.B) {
var num int64 = 2
opt := Options{NumOfWorkers: num}
for i := 0; i < b.N; i++ {
Copy("test/data/case19", fmt.Sprintf("test/data.copy/case19-%d-%d", num, i), opt)
}
}

func BenchmarkOptions_NumOfWorkers_4(b *testing.B) {
var num int64 = 4
opt := Options{NumOfWorkers: num}
for i := 0; i < b.N; i++ {
Copy("test/data/case19", fmt.Sprintf("test/data.copy/case19-%d-%d", num, i), opt)
}
}

func BenchmarkOptions_NumOfWorkers_8(b *testing.B) {
var num int64 = 8
opt := Options{NumOfWorkers: num}
for i := 0; i < b.N; i++ {
Copy("test/data/case19", fmt.Sprintf("test/data.copy/case19-%d-%d", num, i), opt)
}
}
59 changes: 53 additions & 6 deletions copy.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package copy

import (
"context"
"io"
"io/fs"
"io/ioutil"
"os"
"path/filepath"
"time"

"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
)

type timespec struct {
Expand All @@ -18,6 +22,10 @@ type timespec struct {
// Copy copies src to dest, doesn't matter if src is a directory or a file.
func Copy(src, dest string, opts ...Options) error {
opt := assureOptions(src, dest, opts...)
if opt.NumOfWorkers > 1 {
opt.intent.sem = semaphore.NewWeighted(opt.NumOfWorkers)
opt.intent.ctx = context.Background()
}
if opt.FS != nil {
info, err := fs.Stat(opt.FS, src)
if err != nil {
Expand Down Expand Up @@ -183,12 +191,15 @@ func dcopy(srcdir, destdir string, info os.FileInfo, opt Options) (err error) {
return
}

for _, content := range contents {
cs, cd := filepath.Join(srcdir, content.Name()), filepath.Join(destdir, content.Name())

if err = copyNextOrSkip(cs, cd, content, opt); err != nil {
// If any error, exit immediately
return
if yes, err := shouldCopyDirectoryConcurrent(opt, srcdir, destdir); err != nil {
return err
} else if yes {
if err := dcopyConcurrent(srcdir, destdir, contents, opt); err != nil {
return err
}
} else {
if err := dcopySequential(srcdir, destdir, contents, opt); err != nil {
return err
}
}

Expand All @@ -207,6 +218,42 @@ func dcopy(srcdir, destdir string, info os.FileInfo, opt Options) (err error) {
return
}

func dcopySequential(srcdir, destdir string, contents []os.FileInfo, opt Options) error {
for _, content := range contents {
cs, cd := filepath.Join(srcdir, content.Name()), filepath.Join(destdir, content.Name())

if err := copyNextOrSkip(cs, cd, content, opt); err != nil {
// If any error, exit immediately
return err
}
}
return nil
}

// Copy this directory concurrently regarding semaphore of opt.intent
func dcopyConcurrent(srcdir, destdir string, contents []os.FileInfo, opt Options) error {
group, ctx := errgroup.WithContext(opt.intent.ctx)
getRoutine := func(cs, cd string, content os.FileInfo) func() error {
return func() error {
if content.IsDir() {
return copyNextOrSkip(cs, cd, content, opt)
}
if err := opt.intent.sem.Acquire(ctx, 1); err != nil {
return err
}
err := copyNextOrSkip(cs, cd, content, opt)
opt.intent.sem.Release(1)
return err
}
}
for _, content := range contents {
csd := filepath.Join(srcdir, content.Name())
cdd := filepath.Join(destdir, content.Name())
group.Go(getRoutine(csd, cdd, content))
}
return group.Wait()
}

func onDirExists(opt Options, srcdir, destdir string) (bool, error) {
_, err := os.Stat(destdir)
if err == nil && opt.OnDirExists != nil && destdir != opt.intent.dest {
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@ go 1.18

require (
github.com/otiai10/mint v1.5.1
golang.org/x/sync v0.3.0
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8
)
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
github.com/otiai10/mint v1.5.1 h1:XaPLeE+9vGbuyEHem1JNk3bYc7KKqyI/na0/mLd/Kks=
github.com/otiai10/mint v1.5.1/go.mod h1:MJm72SBthJjz8qhefc4z1PYEieWmy8Bku7CjcAqyUSM=
golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E=
golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 h1:0A+M6Uqn+Eje4kHMK80dtF3JCXC4ykBgQG4Fe06QRhQ=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
44 changes: 36 additions & 8 deletions options.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package copy

import (
"context"
"io"
"io/fs"
"os"

"golang.org/x/sync/semaphore"
)

// Options specifies optional actions on copying.
Expand Down Expand Up @@ -65,10 +68,28 @@ type Options struct {
// e.g., You can use embed.FS to copy files from embedded filesystem.
FS fs.FS

intent struct {
src string
dest string
}
// NumOfWorkers represents the number of workers used for
// concurrent copying contents of directories.
// If 0 or 1, it does not use goroutine for copying directories.
// Please refer to https://pkg.go.dev/golang.org/x/sync/semaphore for more details.
NumOfWorkers int64

// PreferConcurrent is a function to determine whether or not
// to use goroutine for copying contents of directories.
// If PreferConcurrent is nil, which is default, it does concurrent
// copying for all directories.
// If NumOfWorkers is 0 or 1, this function will be ignored.
PreferConcurrent func(srcdir, destdir string) (bool, error)

// Internal use only
intent intent
}

type intent struct {
src string
dest string
sem *semaphore.Weighted
ctx context.Context
}

// SymlinkAction represents what to do on symlink.
Expand Down Expand Up @@ -112,10 +133,7 @@ func getDefaultOptions(src, dest string) Options {
PreserveTimes: false, // Do not preserve the modification time
CopyBufferSize: 0, // Do not specify, use default bufsize (32*1024)
WrapReader: nil, // Do not wrap src files, use them as they are.
intent: struct {
src string
dest string
}{src, dest},
intent: intent{src, dest, nil, nil},
}
}

Expand All @@ -141,3 +159,13 @@ func assureOptions(src, dest string, opts ...Options) Options {
opts[0].intent.dest = defopt.intent.dest
return opts[0]
}

func shouldCopyDirectoryConcurrent(opt Options, srcdir, destdir string) (bool, error) {
if opt.NumOfWorkers <= 1 {
return false, nil
}
if opt.PreferConcurrent == nil {
return true, nil
}
return opt.PreferConcurrent(srcdir, destdir)
}
1 change: 1 addition & 0 deletions test/data/case19/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Concurrent case
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.

0 comments on commit fba066a

Please sign in to comment.