Skip to content

Commit

Permalink
Merge pull request docker#10791 from milas/watch-refactor-sync
Browse files Browse the repository at this point in the history
watch: move sync logic into separate package
  • Loading branch information
glours authored Jul 19, 2023
2 parents 9174a99 + cb17c3c commit 8318f66
Show file tree
Hide file tree
Showing 4 changed files with 188 additions and 75 deletions.
107 changes: 107 additions & 0 deletions internal/sync/docker_cp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
Copyright 2023 Docker Compose CLI authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package sync

import (
"context"
"errors"
"fmt"
"io"
"io/fs"
"os"

"github.com/compose-spec/compose-go/types"
"github.com/docker/compose/v2/pkg/api"
"github.com/sirupsen/logrus"
)

type ComposeClient interface {
Exec(ctx context.Context, projectName string, options api.RunOptions) (int, error)

Copy(ctx context.Context, projectName string, options api.CopyOptions) error
}

type DockerCopy struct {
client ComposeClient

projectName string

infoWriter io.Writer
}

var _ Syncer = &DockerCopy{}

func NewDockerCopy(projectName string, client ComposeClient, infoWriter io.Writer) *DockerCopy {
return &DockerCopy{
projectName: projectName,
client: client,
infoWriter: infoWriter,
}
}

func (d *DockerCopy) Sync(ctx context.Context, service types.ServiceConfig, paths []PathMapping) error {
var errs []error
for i := range paths {
if err := d.sync(ctx, service, paths[i]); err != nil {
errs = append(errs, err)
}
}
return errors.Join(errs...)
}

func (d *DockerCopy) sync(ctx context.Context, service types.ServiceConfig, pathMapping PathMapping) error {
scale := 1
if service.Deploy != nil && service.Deploy.Replicas != nil {
scale = int(*service.Deploy.Replicas)
}

if fi, statErr := os.Stat(pathMapping.HostPath); statErr == nil {
if fi.IsDir() {
for i := 1; i <= scale; i++ {
_, err := d.client.Exec(ctx, d.projectName, api.RunOptions{
Service: pathMapping.Service,
Command: []string{"mkdir", "-p", pathMapping.ContainerPath},
Index: i,
})
if err != nil {
logrus.Warnf("failed to create %q from %s: %v", pathMapping.ContainerPath, pathMapping.Service, err)
}
}
fmt.Fprintf(d.infoWriter, "%s created\n", pathMapping.ContainerPath)
} else {
err := d.client.Copy(ctx, d.projectName, api.CopyOptions{
Source: pathMapping.HostPath,
Destination: fmt.Sprintf("%s:%s", pathMapping.Service, pathMapping.ContainerPath),
})
if err != nil {
return err
}
fmt.Fprintf(d.infoWriter, "%s updated\n", pathMapping.ContainerPath)
}
} else if errors.Is(statErr, fs.ErrNotExist) {
for i := 1; i <= scale; i++ {
_, err := d.client.Exec(ctx, d.projectName, api.RunOptions{
Service: pathMapping.Service,
Command: []string{"rm", "-rf", pathMapping.ContainerPath},
Index: i,
})
if err != nil {
logrus.Warnf("failed to delete %q from %s: %v", pathMapping.ContainerPath, pathMapping.Service, err)
}
}
fmt.Fprintf(d.infoWriter, "%s deleted from service\n", pathMapping.ContainerPath)
}
return nil
}
44 changes: 44 additions & 0 deletions internal/sync/shared.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
Copyright 2023 Docker Compose CLI authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package sync

import (
"context"

"github.com/compose-spec/compose-go/types"
)

// PathMapping contains the Compose service and modified host system path.
type PathMapping struct {
// Service that the file event is for.
Service string
// HostPath that was created/modified/deleted outside the container.
//
// This is the path as seen from the user's perspective, e.g.
// - C:\Users\moby\Documents\hello-world\main.go (file on Windows)
// - /Users/moby/Documents/hello-world (directory on macOS)
HostPath string
// ContainerPath for the target file inside the container (only populated
// for sync events, not rebuild).
//
// This is the path as used in Docker CLI commands, e.g.
// - /workdir/main.go
// - /workdir/subdir
ContainerPath string
}

type Syncer interface {
Sync(ctx context.Context, service types.ServiceConfig, paths []PathMapping) error
}
100 changes: 30 additions & 70 deletions pkg/compose/watch.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
Copyright 2020 Docker Compose CLI authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
Expand All @@ -17,13 +17,13 @@ package compose
import (
"context"
"fmt"
"io/fs"
"os"
"path"
"path/filepath"
"strings"
"time"

"github.com/docker/compose/v2/internal/sync"

"github.com/compose-spec/compose-go/types"
"github.com/jonboulle/clockwork"
"github.com/mitchellh/mapstructure"
Expand Down Expand Up @@ -54,11 +54,8 @@ type Trigger struct {

const quietPeriod = 2 * time.Second

// fileMapping contains the Compose service and modified host system path.
//
// For file sync, the container path is also included.
// For rebuild, there is no container path, so it is always empty.
type fileMapping struct {
// fileEvent contains the Compose service and modified host system path.
type fileEvent struct {
// Service that the file event is for.
Service string
// HostPath that was created/modified/deleted outside the container.
Expand All @@ -67,17 +64,11 @@ type fileMapping struct {
// - C:\Users\moby\Documents\hello-world\main.go
// - /Users/moby/Documents/hello-world/main.go
HostPath string
// ContainerPath for the target file inside the container (only populated
// for sync events, not rebuild).
//
// This is the path as used in Docker CLI commands, e.g.
// - /workdir/main.go
ContainerPath string
}

func (s *composeService) Watch(ctx context.Context, project *types.Project, services []string, _ api.WatchOptions) error { //nolint: gocyclo
needRebuild := make(chan fileMapping)
needSync := make(chan fileMapping)
needRebuild := make(chan fileEvent)
needSync := make(chan sync.PathMapping)

_, err := s.prepareProjectForBuild(project, nil)
if err != nil {
Expand Down Expand Up @@ -175,7 +166,7 @@ func (s *composeService) Watch(ctx context.Context, project *types.Project, serv
return eg.Wait()
}

func (s *composeService) watch(ctx context.Context, name string, watcher watch.Notify, triggers []Trigger, needSync chan fileMapping, needRebuild chan fileMapping) error {
func (s *composeService) watch(ctx context.Context, name string, watcher watch.Notify, triggers []Trigger, needSync chan sync.PathMapping, needRebuild chan fileEvent) error {
ignores := make([]watch.PathMatcher, len(triggers))
for i, trigger := range triggers {
ignore, err := watch.NewDockerPatternMatcher(trigger.Path, trigger.Ignore)
Expand Down Expand Up @@ -209,24 +200,25 @@ WATCH:

fmt.Fprintf(s.stdinfo(), "change detected on %s\n", hostPath)

f := fileMapping{
HostPath: hostPath,
Service: name,
}

switch trigger.Action {
case WatchActionSync:
logrus.Debugf("modified file %s triggered sync", hostPath)
rel, err := filepath.Rel(trigger.Path, hostPath)
if err != nil {
return err
}
// always use Unix-style paths for inside the container
f.ContainerPath = path.Join(trigger.Target, rel)
needSync <- f
needSync <- sync.PathMapping{
Service: name,
HostPath: hostPath,
// always use Unix-style paths for inside the container
ContainerPath: path.Join(trigger.Target, rel),
}
case WatchActionRebuild:
logrus.Debugf("modified file %s requires image to be rebuilt", hostPath)
needRebuild <- f
needRebuild <- fileEvent{
HostPath: hostPath,
Service: name,
}
default:
return fmt.Errorf("watch action %q is not supported", trigger)
}
Expand Down Expand Up @@ -304,57 +296,25 @@ func (s *composeService) makeRebuildFn(ctx context.Context, project *types.Proje
}
}

func (s *composeService) makeSyncFn(ctx context.Context, project *types.Project, needSync <-chan fileMapping) func() error {
func (s *composeService) makeSyncFn(
ctx context.Context,
project *types.Project,
needSync <-chan sync.PathMapping,
) func() error {
syncer := sync.NewDockerCopy(project.Name, s, s.stdinfo())

return func() error {
for {
select {
case <-ctx.Done():
return nil
case opt := <-needSync:
service, err := project.GetService(opt.Service)
case pathMapping := <-needSync:
service, err := project.GetService(pathMapping.Service)
if err != nil {
return err
}
scale := 1
if service.Deploy != nil && service.Deploy.Replicas != nil {
scale = int(*service.Deploy.Replicas)
}

if fi, statErr := os.Stat(opt.HostPath); statErr == nil {
if fi.IsDir() {
for i := 1; i <= scale; i++ {
_, err := s.Exec(ctx, project.Name, api.RunOptions{
Service: opt.Service,
Command: []string{"mkdir", "-p", opt.ContainerPath},
Index: i,
})
if err != nil {
logrus.Warnf("failed to create %q from %s: %v", opt.ContainerPath, opt.Service, err)
}
}
fmt.Fprintf(s.stdinfo(), "%s created\n", opt.ContainerPath)
} else {
err := s.Copy(ctx, project.Name, api.CopyOptions{
Source: opt.HostPath,
Destination: fmt.Sprintf("%s:%s", opt.Service, opt.ContainerPath),
})
if err != nil {
return err
}
fmt.Fprintf(s.stdinfo(), "%s updated\n", opt.ContainerPath)
}
} else if errors.Is(statErr, fs.ErrNotExist) {
for i := 1; i <= scale; i++ {
_, err := s.Exec(ctx, project.Name, api.RunOptions{
Service: opt.Service,
Command: []string{"rm", "-rf", opt.ContainerPath},
Index: i,
})
if err != nil {
logrus.Warnf("failed to delete %q from %s: %v", opt.ContainerPath, opt.Service, err)
}
}
fmt.Fprintf(s.stdinfo(), "%s deleted from service\n", opt.ContainerPath)
if err := syncer.Sync(ctx, service, []sync.PathMapping{pathMapping}); err != nil {
return err
}
}
}
Expand All @@ -363,7 +323,7 @@ func (s *composeService) makeSyncFn(ctx context.Context, project *types.Project,

type rebuildServices map[string]utils.Set[string]

func debounce(ctx context.Context, clock clockwork.Clock, delay time.Duration, input <-chan fileMapping, fn func(services rebuildServices)) {
func debounce(ctx context.Context, clock clockwork.Clock, delay time.Duration, input <-chan fileEvent, fn func(services rebuildServices)) {
services := make(rebuildServices)
t := clock.NewTimer(delay)
defer t.Stop()
Expand Down
12 changes: 7 additions & 5 deletions pkg/compose/watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"testing"
"time"

"github.com/docker/compose/v2/internal/sync"

"github.com/docker/cli/cli/command"
"github.com/docker/compose/v2/pkg/watch"
"github.com/jonboulle/clockwork"
Expand All @@ -27,7 +29,7 @@ import (
)

func Test_debounce(t *testing.T) {
ch := make(chan fileMapping)
ch := make(chan fileEvent)
var (
ran int
got []string
Expand All @@ -47,7 +49,7 @@ func Test_debounce(t *testing.T) {
return nil
})
for i := 0; i < 100; i++ {
ch <- fileMapping{Service: "test"}
ch <- fileEvent{Service: "test"}
}
assert.Equal(t, ran, 0)
clock.Advance(quietPeriod)
Expand Down Expand Up @@ -79,8 +81,8 @@ func (t testWatcher) Errors() chan error {
}

func Test_sync(t *testing.T) {
needSync := make(chan fileMapping)
needRebuild := make(chan fileMapping)
needSync := make(chan sync.PathMapping)
needRebuild := make(chan fileEvent)
ctx, cancelFunc := context.WithCancel(context.TODO())
defer cancelFunc()

Expand Down Expand Up @@ -119,7 +121,7 @@ func Test_sync(t *testing.T) {
watcher.Events() <- watch.NewFileEvent("/src/changed")
select {
case actual := <-needSync:
assert.DeepEqual(t, fileMapping{Service: "test", HostPath: "/src/changed", ContainerPath: "/work/changed"}, actual)
assert.DeepEqual(t, sync.PathMapping{Service: "test", HostPath: "/src/changed", ContainerPath: "/work/changed"}, actual)
case <-time.After(100 * time.Millisecond):
t.Error("timeout")
}
Expand Down

0 comments on commit 8318f66

Please sign in to comment.