Skip to content

Commit

Permalink
reuse filewatcher
Browse files Browse the repository at this point in the history
Signed-off-by: zirain <[email protected]>
  • Loading branch information
zirain committed Oct 19, 2024
1 parent 5a1c065 commit be0419e
Show file tree
Hide file tree
Showing 9 changed files with 100 additions and 407 deletions.
1 change: 0 additions & 1 deletion internal/filewatcher/filewatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ func (fw *fileWatcher) Add(path string) error {
return err
}

// Stop watching a path
func (fw *fileWatcher) Remove(path string) error {
fw.mu.Lock()
defer fw.mu.Unlock()
Expand Down
26 changes: 11 additions & 15 deletions internal/filewatcher/filewatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,27 +295,23 @@ func TestBadAddWatcher(t *testing.T) {

func TestDuplicateAdd(t *testing.T) {
w := NewWatcher()

name := newWatchFile(t)
defer func() {
_ = w.Close()
_ = os.Remove(name)
}()

if err := w.Add(name); err != nil {
t.Errorf("Expecting nil, got %v", err)
}

if err := w.Add(name); err == nil {
t.Errorf("Expecting error, got nil")
}

_ = w.Close()
require.NoError(t, w.Add(name))
require.Error(t, w.Add(name))
}

func TestBogusRemove(t *testing.T) {
w := NewWatcher()

name := newWatchFile(t)
if err := w.Remove(name); err == nil {
t.Errorf("Expecting error, got nil")
}
defer func() {
_ = w.Close()
_ = os.Remove(name)
}()

_ = w.Close()
require.Error(t, w.Remove(name))
}
49 changes: 33 additions & 16 deletions internal/provider/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,32 +13,34 @@ import (

"github.com/fsnotify/fsnotify"
"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/util/sets"
"sigs.k8s.io/controller-runtime/pkg/healthz"

egv1a1 "github.com/envoyproxy/gateway/api/v1alpha1"
"github.com/envoyproxy/gateway/internal/envoygateway/config"
"github.com/envoyproxy/gateway/internal/filewatcher"
"github.com/envoyproxy/gateway/internal/message"
"github.com/envoyproxy/gateway/internal/utils/path"
)

type Provider struct {
paths []string
logger logr.Logger
notifier *Notifier
watcher filewatcher.FileWatcher
resourcesStore *resourcesStore
}

func New(svr *config.Server, resources *message.ProviderResources) (*Provider, error) {
logger := svr.Logger.Logger

notifier, err := NewNotifier(logger)
if err != nil {
return nil, err
paths := sets.New[string]()
if svr.EnvoyGateway.Provider.Custom.Resource.File != nil {
paths.Insert(svr.EnvoyGateway.Provider.Custom.Resource.File.Paths...)

Check warning on line 37 in internal/provider/file/file.go

View check run for this annotation

Codecov / codecov/patch

internal/provider/file/file.go#L35-L37

Added lines #L35 - L37 were not covered by tests
}

return &Provider{
paths: svr.EnvoyGateway.Provider.Custom.Resource.File.Paths,
paths: paths.UnsortedList(),

Check warning on line 41 in internal/provider/file/file.go

View check run for this annotation

Codecov / codecov/patch

internal/provider/file/file.go#L41

Added line #L41 was not covered by tests
logger: logger,
notifier: notifier,
watcher: filewatcher.NewWatcher(),

Check warning on line 43 in internal/provider/file/file.go

View check run for this annotation

Codecov / codecov/patch

internal/provider/file/file.go#L43

Added line #L43 was not covered by tests
resourcesStore: newResourcesStore(svr.EnvoyGateway.Gateway.ControllerName, resources, logger),
}, nil
}
Expand All @@ -48,35 +50,50 @@ func (p *Provider) Type() egv1a1.ProviderType {
}

func (p *Provider) Start(ctx context.Context) error {
dirs, files, err := getDirsAndFilesForWatcher(p.paths)
if err != nil {
return fmt.Errorf("failed to get directories and files for the watcher: %w", err)
}
defer func() {
_ = p.watcher.Close()
}()

Check warning on line 55 in internal/provider/file/file.go

View check run for this annotation

Codecov / codecov/patch

internal/provider/file/file.go#L53-L55

Added lines #L53 - L55 were not covered by tests

// Start runnable servers.
go p.startHealthProbeServer(ctx)

dirs, files := path.ListDirsAndFiles(p.paths)

Check warning on line 60 in internal/provider/file/file.go

View check run for this annotation

Codecov / codecov/patch

internal/provider/file/file.go#L60

Added line #L60 was not covered by tests
// Initially load resources from paths on host.
if err = p.resourcesStore.LoadAndStore(files.UnsortedList(), dirs.UnsortedList()); err != nil {
if err := p.resourcesStore.LoadAndStore(files.UnsortedList(), dirs.UnsortedList()); err != nil {

Check warning on line 62 in internal/provider/file/file.go

View check run for this annotation

Codecov / codecov/patch

internal/provider/file/file.go#L62

Added line #L62 was not covered by tests
return fmt.Errorf("failed to load resources into store: %w", err)
}

// Start watchers in notifier.
p.notifier.Watch(ctx, dirs, files)
defer p.notifier.Close()
// aggregate all path channel into one
aggCh := make(chan fsnotify.Event)
for _, path := range p.paths {
if err := p.watcher.Add(path); err != nil {
p.logger.Error(err, "failed to add watch", "path", path)
}
p.logger.Info("Watching file changed", "path", path)
ch := p.watcher.Events(path)
go func(c chan fsnotify.Event) {
for msg := range c {
aggCh <- msg
}

Check warning on line 77 in internal/provider/file/file.go

View check run for this annotation

Codecov / codecov/patch

internal/provider/file/file.go#L67-L77

Added lines #L67 - L77 were not covered by tests
}(ch)
}

for {
select {
case <-ctx.Done():
return nil
case event := <-p.notifier.Events:
case event := <-aggCh:
p.logger.Info("file changed", "op", event.Op, "name", event.Name)

Check warning on line 86 in internal/provider/file/file.go

View check run for this annotation

Codecov / codecov/patch

internal/provider/file/file.go#L85-L86

Added lines #L85 - L86 were not covered by tests
switch event.Op {
case fsnotify.Create:
dirs.Insert(event.Name)
files.Insert(event.Name)
case fsnotify.Remove:
dirs.Delete(event.Name)
files.Delete(event.Name)
default:
// do nothing
continue

Check warning on line 96 in internal/provider/file/file.go

View check run for this annotation

Codecov / codecov/patch

internal/provider/file/file.go#L94-L96

Added lines #L94 - L96 were not covered by tests
}

p.resourcesStore.HandleEvent(event, files.UnsortedList(), dirs.UnsortedList())
Expand Down
Loading

0 comments on commit be0419e

Please sign in to comment.