diff --git a/internal/filewatcher/filewatcher.go b/internal/filewatcher/filewatcher.go index 4fce5e9aba41..2a659d6d262f 100644 --- a/internal/filewatcher/filewatcher.go +++ b/internal/filewatcher/filewatcher.go @@ -90,7 +90,6 @@ func (fw *fileWatcher) Add(path string) error { return err } -// Stop watching a path func (fw *fileWatcher) Remove(path string) error { fw.mu.Lock() defer fw.mu.Unlock() diff --git a/internal/filewatcher/filewatcher_test.go b/internal/filewatcher/filewatcher_test.go index 5230d7c05ad3..6c7b52b0878b 100644 --- a/internal/filewatcher/filewatcher_test.go +++ b/internal/filewatcher/filewatcher_test.go @@ -295,27 +295,23 @@ func TestBadAddWatcher(t *testing.T) { func TestDuplicateAdd(t *testing.T) { w := NewWatcher() - name := newWatchFile(t) + defer func() { + _ = w.Close() + _ = os.Remove(name) + }() - if err := w.Add(name); err != nil { - t.Errorf("Expecting nil, got %v", err) - } - - if err := w.Add(name); err == nil { - t.Errorf("Expecting error, got nil") - } - - _ = w.Close() + require.NoError(t, w.Add(name)) + require.Error(t, w.Add(name)) } func TestBogusRemove(t *testing.T) { w := NewWatcher() - name := newWatchFile(t) - if err := w.Remove(name); err == nil { - t.Errorf("Expecting error, got nil") - } + defer func() { + _ = w.Close() + _ = os.Remove(name) + }() - _ = w.Close() + require.Error(t, w.Remove(name)) } diff --git a/internal/provider/file/file.go b/internal/provider/file/file.go index 79ccd04e7634..64cd82853839 100644 --- a/internal/provider/file/file.go +++ b/internal/provider/file/file.go @@ -13,32 +13,34 @@ import ( "github.com/fsnotify/fsnotify" "github.com/go-logr/logr" + "k8s.io/apimachinery/pkg/util/sets" "sigs.k8s.io/controller-runtime/pkg/healthz" egv1a1 "github.com/envoyproxy/gateway/api/v1alpha1" "github.com/envoyproxy/gateway/internal/envoygateway/config" + "github.com/envoyproxy/gateway/internal/filewatcher" "github.com/envoyproxy/gateway/internal/message" + "github.com/envoyproxy/gateway/internal/utils/path" ) type Provider struct { paths []string logger logr.Logger - notifier *Notifier + watcher filewatcher.FileWatcher resourcesStore *resourcesStore } func New(svr *config.Server, resources *message.ProviderResources) (*Provider, error) { logger := svr.Logger.Logger - - notifier, err := NewNotifier(logger) - if err != nil { - return nil, err + paths := sets.New[string]() + if svr.EnvoyGateway.Provider.Custom.Resource.File != nil { + paths.Insert(svr.EnvoyGateway.Provider.Custom.Resource.File.Paths...) } return &Provider{ - paths: svr.EnvoyGateway.Provider.Custom.Resource.File.Paths, + paths: paths.UnsortedList(), logger: logger, - notifier: notifier, + watcher: filewatcher.NewWatcher(), resourcesStore: newResourcesStore(svr.EnvoyGateway.Gateway.ControllerName, resources, logger), }, nil } @@ -48,28 +50,40 @@ func (p *Provider) Type() egv1a1.ProviderType { } func (p *Provider) Start(ctx context.Context) error { - dirs, files, err := getDirsAndFilesForWatcher(p.paths) - if err != nil { - return fmt.Errorf("failed to get directories and files for the watcher: %w", err) - } + defer func() { + _ = p.watcher.Close() + }() // Start runnable servers. go p.startHealthProbeServer(ctx) + dirs, files := path.ListDirsAndFiles(p.paths) // Initially load resources from paths on host. - if err = p.resourcesStore.LoadAndStore(files.UnsortedList(), dirs.UnsortedList()); err != nil { + if err := p.resourcesStore.LoadAndStore(files.UnsortedList(), dirs.UnsortedList()); err != nil { return fmt.Errorf("failed to load resources into store: %w", err) } - // Start watchers in notifier. - p.notifier.Watch(ctx, dirs, files) - defer p.notifier.Close() + // aggregate all path channel into one + aggCh := make(chan fsnotify.Event) + for _, path := range p.paths { + if err := p.watcher.Add(path); err != nil { + p.logger.Error(err, "failed to add watch", "path", path) + } + p.logger.Info("Watching file changed", "path", path) + ch := p.watcher.Events(path) + go func(c chan fsnotify.Event) { + for msg := range c { + aggCh <- msg + } + }(ch) + } for { select { case <-ctx.Done(): return nil - case event := <-p.notifier.Events: + case event := <-aggCh: + p.logger.Info("file changed", "op", event.Op, "name", event.Name) switch event.Op { case fsnotify.Create: dirs.Insert(event.Name) @@ -77,6 +91,9 @@ func (p *Provider) Start(ctx context.Context) error { case fsnotify.Remove: dirs.Delete(event.Name) files.Delete(event.Name) + default: + // do nothing + continue } p.resourcesStore.HandleEvent(event, files.UnsortedList(), dirs.UnsortedList()) diff --git a/internal/provider/file/notifier.go b/internal/provider/file/notifier.go deleted file mode 100644 index fca8465e3afe..000000000000 --- a/internal/provider/file/notifier.go +++ /dev/null @@ -1,316 +0,0 @@ -// Copyright Envoy Gateway Authors -// SPDX-License-Identifier: Apache-2.0 -// The full text of the Apache license is available in the LICENSE file at -// the root of the repo. - -package file - -import ( - "context" - "os" - "path/filepath" - "strings" - "time" - - "github.com/fsnotify/fsnotify" - "github.com/go-logr/logr" - "k8s.io/apimachinery/pkg/util/sets" -) - -const ( - defaultCleanUpRemoveEventsPeriod = 300 * time.Millisecond -) - -type Notifier struct { - // Events record events used to update ResourcesStore, - // which only include two types of events: Write/Remove. - Events chan fsnotify.Event - - filesWatcher *fsnotify.Watcher - dirsWatcher *fsnotify.Watcher - cleanUpRemoveEventsPeriod time.Duration - - logger logr.Logger -} - -func NewNotifier(logger logr.Logger) (*Notifier, error) { - fw, err := fsnotify.NewBufferedWatcher(10) - if err != nil { - return nil, err - } - - dw, err := fsnotify.NewBufferedWatcher(10) - if err != nil { - return nil, err - } - - return &Notifier{ - Events: make(chan fsnotify.Event), - filesWatcher: fw, - dirsWatcher: dw, - cleanUpRemoveEventsPeriod: defaultCleanUpRemoveEventsPeriod, - logger: logger, - }, nil -} - -func (n *Notifier) Watch(ctx context.Context, dirs, files sets.Set[string]) { - n.watchDirs(ctx, dirs) - n.watchFiles(ctx, files) -} - -func (n *Notifier) Close() error { - if err := n.filesWatcher.Close(); err != nil { - return err - } - if err := n.dirsWatcher.Close(); err != nil { - return err - } - return nil -} - -// watchFiles watches one or more files, but instead of watching the file directly, -// it watches its parent directory. This solves various issues where files are -// frequently renamed. -func (n *Notifier) watchFiles(ctx context.Context, files sets.Set[string]) { - if len(files) < 1 { - return - } - - go n.runFilesWatcher(ctx, files) - - for p := range files { - if err := n.filesWatcher.Add(filepath.Dir(p)); err != nil { - n.logger.Error(err, "error adding file to notifier", "path", p) - - continue - } - } -} - -func (n *Notifier) runFilesWatcher(ctx context.Context, files sets.Set[string]) { - var ( - cleanUpTicker = time.NewTicker(n.cleanUpRemoveEventsPeriod) - - // This map records the exact previous Op of one event. - preEventOp = make(map[string]fsnotify.Op) - // This set records the name of event that related to Remove Op. - curRemoveEvents = sets.NewString() - ) - - for { - select { - case <-ctx.Done(): - return - - case err, ok := <-n.filesWatcher.Errors: - if !ok { - return - } - n.logger.Error(err, "error from files watcher in notifier") - - case event, ok := <-n.filesWatcher.Events: - if !ok { - return - } - - // Ignore file and operation the watcher not interested in. - if !files.Has(event.Name) || event.Has(fsnotify.Chmod) { - continue - } - - // This logic is trying to avoid files be removed and then created - // frequently by considering Remove/Rename and the follow Create - // Op as one Write Notifier.Event. - // - // Actually, this approach is also suitable for commands like vi/vim. - // It creates a temporary file, removes the existing one and replace - // it with the temporary file when file is saved. So instead of Write - // Op, the watcher will receive Rename and Create Op. - - var writeEvent bool - switch event.Op { - case fsnotify.Create: - if op, ok := preEventOp[event.Name]; ok && - op.Has(fsnotify.Rename) || op.Has(fsnotify.Remove) { - writeEvent = true - // If the exact previous Op of Create is Rename/Remove, - // then consider them as a Write Notifier.Event instead of Remove. - curRemoveEvents.Delete(event.Name) - } - case fsnotify.Write: - writeEvent = true - case fsnotify.Remove, fsnotify.Rename: - curRemoveEvents.Insert(event.Name) - } - - if writeEvent { - n.logger.Info("sending write event", - "name", event.Name, "watcher", "files") - - n.Events <- fsnotify.Event{ - Name: event.Name, - Op: fsnotify.Write, - } - } - preEventOp[event.Name] = event.Op - - case <-cleanUpTicker.C: - // As for collected Remove Notifier.Event, clean them up - // in a period of time to avoid neglect of dealing with - // Remove/Rename Op. - for e := range curRemoveEvents { - n.logger.Info("sending remove event", - "name", e, "watcher", "files") - - n.Events <- fsnotify.Event{ - Name: e, - Op: fsnotify.Remove, - } - } - curRemoveEvents = sets.NewString() - } - } -} - -// watchDirs watches one or more directories. -func (n *Notifier) watchDirs(ctx context.Context, dirs sets.Set[string]) { - if len(dirs) < 1 { - return - } - - // This map maintains the subdirectories ignored by each directory. - ignoredSubDirs := make(map[string]sets.Set[string]) - - for p := range dirs { - if err := n.dirsWatcher.Add(p); err != nil { - n.logger.Error(err, "error adding dir to notifier", "path", p) - - continue - } - - // Find current exist subdirectories to init ignored subdirectories set. - entries, err := os.ReadDir(p) - if err != nil { - n.logger.Error(err, "error reading dir in notifier", "path", p) - - if err = n.dirsWatcher.Remove(p); err != nil { - n.logger.Error(err, "error removing dir from notifier", "path", p) - } - - continue - } - - ignoredSubDirs[p] = sets.New[string]() - for _, entry := range entries { - if entry.IsDir() { - // The entry name is dir name, not dir path. - ignoredSubDirs[p].Insert(entry.Name()) - } - } - } - - go n.runDirsWatcher(ctx, ignoredSubDirs) -} - -func (n *Notifier) runDirsWatcher(ctx context.Context, ignoredSubDirs map[string]sets.Set[string]) { - var ( - cleanUpTicker = time.NewTicker(n.cleanUpRemoveEventsPeriod) - - // This map records the exact previous Op of one event. - preEventOp = make(map[string]fsnotify.Op) - // This set records the name of event that related to Remove Op. - curRemoveEvents = sets.NewString() - ) - - for { - select { - case <-ctx.Done(): - return - - case err, ok := <-n.dirsWatcher.Errors: - if !ok { - return - } - n.logger.Error(err, "error from dirs watcher in notifier") - - case event, ok := <-n.dirsWatcher.Events: - if !ok { - return - } - - // Ignore the hidden or temporary file related event. - _, name := filepath.Split(event.Name) - if event.Has(fsnotify.Chmod) || - strings.HasPrefix(name, ".") || - strings.HasSuffix(name, "~") { - continue - } - - // Ignore any subdirectory related event. - switch event.Op { - case fsnotify.Create: - if fi, err := os.Lstat(event.Name); err == nil && fi.IsDir() { - parentDir := filepath.Dir(event.Name) - if _, ok := ignoredSubDirs[parentDir]; ok { - ignoredSubDirs[parentDir].Insert(name) - continue - } - } - case fsnotify.Remove, fsnotify.Rename: - parentDir := filepath.Dir(event.Name) - if sub, ok := ignoredSubDirs[parentDir]; ok && sub.Has(name) { - ignoredSubDirs[parentDir].Delete(name) - continue - } - } - - // Share the similar logic as in files watcher. - var writeEvent bool - switch event.Op { - case fsnotify.Create: - if op, ok := preEventOp[event.Name]; ok && - op.Has(fsnotify.Rename) || op.Has(fsnotify.Remove) { - curRemoveEvents.Delete(event.Name) - } - // Since the watcher watches the whole dir, the creation of file - // should also be able to trigger the Write event. - writeEvent = true - case fsnotify.Write: - writeEvent = true - case fsnotify.Remove, fsnotify.Rename: - curRemoveEvents.Insert(event.Name) - } - - if writeEvent { - n.logger.Info("sending write event", - "name", event.Name, "watcher", "dirs") - - n.Events <- fsnotify.Event{ - Name: event.Name, - Op: fsnotify.Write, - } - } - preEventOp[event.Name] = event.Op - - case <-cleanUpTicker.C: - // Merge files to be removed in the same parent directory - // to suppress events, because the file has already been - // removed and is unnecessary to send event for each of them. - parentDirs := sets.NewString() - for e := range curRemoveEvents { - parentDirs.Insert(filepath.Dir(e)) - } - - for parentDir := range parentDirs { - n.logger.Info("sending remove event", - "name", parentDir, "watcher", "dirs") - - n.Events <- fsnotify.Event{ - Name: parentDir, - Op: fsnotify.Remove, - } - } - curRemoveEvents = sets.NewString() - } - } -} diff --git a/internal/provider/file/path.go b/internal/provider/file/path.go deleted file mode 100644 index fe3ad7539f61..000000000000 --- a/internal/provider/file/path.go +++ /dev/null @@ -1,47 +0,0 @@ -// Copyright Envoy Gateway Authors -// SPDX-License-Identifier: Apache-2.0 -// The full text of the Apache license is available in the LICENSE file at -// the root of the repo. - -package file - -import ( - "os" - "path/filepath" - - "k8s.io/apimachinery/pkg/util/sets" -) - -// getDirsAndFilesForWatcher prepares dirs and files for the watcher in notifier. -func getDirsAndFilesForWatcher(paths []string) ( - dirs sets.Set[string], files sets.Set[string], err error, -) { - dirs, files = sets.New[string](), sets.New[string]() - - // Separate paths by whether is a directory or not. - paths = sets.NewString(paths...).List() - for _, path := range paths { - var p os.FileInfo - p, err = os.Lstat(path) - if err != nil { - return - } - - if p.IsDir() { - dirs.Insert(path) - } else { - files.Insert(path) - } - } - - // Ignore filepath if its parent directory is also be watched. - var ignoreFiles []string - for fp := range files { - if dirs.Has(filepath.Dir(fp)) { - ignoreFiles = append(ignoreFiles, fp) - } - } - files.Delete(ignoreFiles...) - - return -} diff --git a/internal/provider/file/testdata/paths/dir/bar b/internal/provider/file/testdata/paths/dir/bar deleted file mode 100644 index e1878797a7c6..000000000000 --- a/internal/provider/file/testdata/paths/dir/bar +++ /dev/null @@ -1 +0,0 @@ -THIS FILE IS FOR TEST ONLY \ No newline at end of file diff --git a/internal/provider/file/testdata/paths/foo b/internal/provider/file/testdata/paths/foo deleted file mode 100644 index e1878797a7c6..000000000000 --- a/internal/provider/file/testdata/paths/foo +++ /dev/null @@ -1 +0,0 @@ -THIS FILE IS FOR TEST ONLY \ No newline at end of file diff --git a/internal/utils/path/path.go b/internal/utils/path/path.go index e333a7f59718..4291dd588482 100644 --- a/internal/utils/path/path.go +++ b/internal/utils/path/path.go @@ -8,6 +8,8 @@ package path import ( "os" "path/filepath" + + "k8s.io/apimachinery/pkg/util/sets" ) // ValidateOutputPath takes an output file path and returns it as an absolute path. @@ -22,3 +24,35 @@ func ValidateOutputPath(outputPath string) (string, error) { } return outputPath, nil } + +// ListDirsAndFiles return a list of directories and files from a list of paths recursively. +func ListDirsAndFiles(paths []string) (dirs sets.Set[string], files sets.Set[string]) { + dirs, files = sets.New[string](), sets.New[string]() + // Separate paths by whether is a directory or not. + paths = sets.NewString(paths...).UnsortedList() + for _, path := range paths { + var p os.FileInfo + p, err := os.Lstat(path) + if err != nil { + // skip + continue + } + + if p.IsDir() { + dirs.Insert(path) + } else { + files.Insert(path) + } + } + + // Ignore filepath if its parent directory is also be watched. + var ignoreFiles []string + for fp := range files { + if dirs.Has(filepath.Dir(fp)) { + ignoreFiles = append(ignoreFiles, fp) + } + } + files.Delete(ignoreFiles...) + + return +} diff --git a/internal/provider/file/path_test.go b/internal/utils/path/path_test.go similarity index 51% rename from internal/provider/file/path_test.go rename to internal/utils/path/path_test.go index 183c24efa978..8b3db14784d7 100644 --- a/internal/provider/file/path_test.go +++ b/internal/utils/path/path_test.go @@ -3,17 +3,28 @@ // The full text of the Apache license is available in the LICENSE file at // the root of the repo. -package file +package path import ( + "os" "path" "testing" "github.com/stretchr/testify/require" ) -func TestGetDirsAndFilesForWatcher(t *testing.T) { - testPath := path.Join("testdata", "paths") +func TestListDirsAndFiles(t *testing.T) { + basePath, _ := os.MkdirTemp(os.TempDir(), "list-test") + defer func() { + _ = os.RemoveAll(basePath) + }() + paths, err := os.MkdirTemp(basePath, "paths") + require.NoError(t, err) + dirPath, err := os.MkdirTemp(paths, "dir") + require.NoError(t, err) + require.NoError(t, os.WriteFile(path.Join(paths, "foo"), []byte("foo"), 0o700)) // nolint: gosec + require.NoError(t, os.WriteFile(path.Join(dirPath, "bar"), []byte("bar"), 0o700)) // nolint: gosec + testCases := []struct { name string paths []string @@ -23,22 +34,23 @@ func TestGetDirsAndFilesForWatcher(t *testing.T) { { name: "get file and dir path", paths: []string{ - path.Join(testPath, "dir"), path.Join(testPath, "foo"), + dirPath, + path.Join(paths, "foo"), }, expectDirs: []string{ - path.Join(testPath, "dir"), + dirPath, }, expectFiles: []string{ - path.Join(testPath, "foo"), + path.Join(paths, "foo"), }, }, { name: "overlap file path will be ignored", paths: []string{ - path.Join(testPath, "dir"), path.Join(testPath, "dir", "bar"), + dirPath, path.Join(dirPath, "bar"), }, expectDirs: []string{ - path.Join(testPath, "dir"), + dirPath, }, expectFiles: []string{}, }, @@ -46,9 +58,9 @@ func TestGetDirsAndFilesForWatcher(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - dirs, paths, _ := getDirsAndFilesForWatcher(tc.paths) + dirs, files := ListDirsAndFiles(tc.paths) require.ElementsMatch(t, dirs.UnsortedList(), tc.expectDirs) - require.ElementsMatch(t, paths.UnsortedList(), tc.expectFiles) + require.ElementsMatch(t, files.UnsortedList(), tc.expectFiles) }) } }