From 3de491f6035a91bf64f7cdd7daf725ea2e4135b5 Mon Sep 17 00:00:00 2001 From: Nathan Youngman Date: Sun, 14 Sep 2014 17:58:38 -0600 Subject: [PATCH] kqueue: less mutexes closes #13 --- kqueue.go | 147 ++++++++++++++++++++++-------------------------------- 1 file changed, 60 insertions(+), 87 deletions(-) diff --git a/kqueue.go b/kqueue.go index 9af190c..4f37b49 100644 --- a/kqueue.go +++ b/kqueue.go @@ -21,24 +21,17 @@ import ( type Watcher struct { Events chan Event Errors chan error + done chan bool // Channel for sending a "quit message" to the reader goroutine - kq int // File descriptor (as returned by the kqueue() syscall). + kq int // File descriptor (as returned by the kqueue() syscall). + + mu sync.Mutex // Protects access to watcher data watches map[string]int // Map of watched file descriptors (key: path). externalWatches map[string]bool // Map of watches added by user of the library. dirFlags map[string]uint32 // Map of watched directories to fflags used in kqueue. paths map[int]pathInfo // Map file descriptors to path names for processing kqueue events. fileExists map[string]bool // Keep track of if we know this file exists (to stop duplicate create events). - done chan bool // Channel for sending a "quit message" to the reader goroutine isClosed bool // Set to true when Close() is first called - - mu sync.Mutex // Mutex for the Watcher itself (isClosed). - - wmut sync.Mutex // Protects access to watches. - pmut sync.Mutex // Protects access to paths. - ewmut sync.Mutex // Protects access to externalWatches. - - dirmut sync.Mutex // Protects access to dirFlags. - femut sync.Mutex // Protects access to fileExists. } type pathInfo struct { @@ -81,9 +74,10 @@ func (w *Watcher) Close() error { // Send "quit" message to the reader goroutine: w.done <- true - w.wmut.Lock() + + w.mu.Lock() ws := w.watches - w.wmut.Unlock() + w.mu.Unlock() for name := range ws { w.Remove(name) } @@ -93,18 +87,18 @@ func (w *Watcher) Close() error { // Add starts watching the named file or directory (non-recursively). func (w *Watcher) Add(name string) error { - w.ewmut.Lock() + w.mu.Lock() w.externalWatches[name] = true - w.ewmut.Unlock() + w.mu.Unlock() return w.addWatch(name, noteAllEvents) } // Remove stops watching the the named file or directory (non-recursively). func (w *Watcher) Remove(name string) error { name = filepath.Clean(name) - w.wmut.Lock() + w.mu.Lock() watchfd, ok := w.watches[name] - w.wmut.Unlock() + w.mu.Unlock() if !ok { return fmt.Errorf("can't remove non-existent kevent watch for: %s", name) } @@ -116,32 +110,26 @@ func (w *Watcher) Remove(name string) error { syscall.Close(watchfd) - w.wmut.Lock() - delete(w.watches, name) - w.wmut.Unlock() - w.dirmut.Lock() - delete(w.dirFlags, name) - w.dirmut.Unlock() - w.pmut.Lock() + w.mu.Lock() isDir := w.paths[watchfd].isDir + delete(w.watches, name) delete(w.paths, watchfd) - w.pmut.Unlock() + delete(w.dirFlags, name) + w.mu.Unlock() // Find all watched paths that are in this directory that are not external. if isDir { var pathsToRemove []string - w.pmut.Lock() + w.mu.Lock() for _, path := range w.paths { wdir, _ := filepath.Split(path.name) - if filepath.Clean(wdir) == filepath.Clean(name) { - w.ewmut.Lock() + if filepath.Clean(wdir) == name { if !w.externalWatches[path.name] { pathsToRemove = append(pathsToRemove, path.name) } - w.ewmut.Unlock() } } - w.pmut.Unlock() + w.mu.Unlock() for _, name := range pathsToRemove { // Since these are internal, not much sense in propagating error // to the user, as that will just confuse them with an error about @@ -162,34 +150,29 @@ var keventWaitTime = durationToTimespec(100 * time.Millisecond) // addWatch adds name to the watched file set. // The flags are interpreted as described in kevent(2). func (w *Watcher) addWatch(name string, flags uint32) error { + var isDir bool + // Make ./name and name equivalent + name = filepath.Clean(name) + w.mu.Lock() if w.isClosed { w.mu.Unlock() return errors.New("kevent instance already closed") } - w.mu.Unlock() - - // Make ./name and name equivalent - name = filepath.Clean(name) - - w.wmut.Lock() watchfd, alreadyWatching := w.watches[name] - w.wmut.Unlock() - - var isDir bool - + // We already have a watch, but we can still override flags. if alreadyWatching { - // We already have a watch, but we can still override flags - w.pmut.Lock() isDir = w.paths[watchfd].isDir - w.pmut.Unlock() - } else { + } + w.mu.Unlock() + + if !alreadyWatching { fi, err := os.Lstat(name) if err != nil { return err } - // don't watch socket + // Don't watch sockets. if fi.Mode()&os.ModeSocket == os.ModeSocket { return nil } @@ -227,24 +210,21 @@ func (w *Watcher) addWatch(name string, flags uint32) error { } if !alreadyWatching { - w.wmut.Lock() + w.mu.Lock() w.watches[name] = watchfd - w.wmut.Unlock() - - w.pmut.Lock() w.paths[watchfd] = pathInfo{name: name, isDir: isDir} - w.pmut.Unlock() + w.mu.Unlock() } if isDir { // Watch the directory if it has not been watched before, // or if it was watched before, but perhaps only a NOTE_DELETE (watchDirectoryFiles) - w.dirmut.Lock() + w.mu.Lock() watchDir := (flags&syscall.NOTE_WRITE) == syscall.NOTE_WRITE && (!alreadyWatching || (w.dirFlags[name]&syscall.NOTE_WRITE) != syscall.NOTE_WRITE) // Store flags so this watch can be updated later w.dirFlags[name] = flags - w.dirmut.Unlock() + w.mu.Unlock() if watchDir { if err := w.watchDirectoryFiles(name); err != nil { @@ -255,8 +235,8 @@ func (w *Watcher) addWatch(name string, flags uint32) error { return nil } -// readEvents reads from the kqueue file descriptor, converts the -// received events into Event objects and sends them via the Events channel +// readEvents reads from kqueue and converts the received kevents into +// Event values that it sends down the Events channel. func (w *Watcher) readEvents() { eventBuffer := make([]syscall.Kevent_t, 10) @@ -264,9 +244,9 @@ func (w *Watcher) readEvents() { // See if there is a message on the "done" channel select { case <-w.done: - errno := syscall.Close(w.kq) - if errno != nil { - w.Errors <- os.NewSyscallError("close", errno) + err := syscall.Close(w.kq) + if err != nil { + w.Errors <- os.NewSyscallError("close", err) } close(w.Events) close(w.Errors) @@ -284,18 +264,16 @@ func (w *Watcher) readEvents() { // Flush the events we received to the Events channel for len(kevents) > 0 { - watchEvent := &kevents[0] - watchfd := int(watchEvent.Ident) - mask := uint32(watchEvent.Fflags) - - w.pmut.Lock() + kevent := &kevents[0] + watchfd := int(kevent.Ident) + mask := uint32(kevent.Fflags) + w.mu.Lock() path := w.paths[watchfd] - w.pmut.Unlock() - + w.mu.Unlock() event := newEvent(path.name, mask) if path.isDir && !(event.Op&Remove == Remove) { - // Double check to make sure the directory exist. This can happen when + // Double check to make sure the directory exists. This can happen when // we do a rm -fr on a recursively watched folders and we receive a // modification event first but the folder has been deleted and later // receive the delete event @@ -312,28 +290,20 @@ func (w *Watcher) readEvents() { w.Events <- event } - // Move to next event - kevents = kevents[1:] - - if event.Op&Rename == Rename { + if event.Op&Rename == Rename || event.Op&Remove == Remove { w.Remove(event.Name) - w.femut.Lock() + w.mu.Lock() delete(w.fileExists, event.Name) - w.femut.Unlock() + w.mu.Unlock() } if event.Op&Remove == Remove { - w.Remove(event.Name) - w.femut.Lock() - delete(w.fileExists, event.Name) - w.femut.Unlock() - // Look for a file that may have overwritten this. // For example, mv f1 f2 will delete f2, then create f2. fileDir, _ := filepath.Split(event.Name) fileDir = filepath.Clean(fileDir) - w.wmut.Lock() + w.mu.Lock() _, found := w.watches[fileDir] - w.wmut.Unlock() + w.mu.Unlock() if found { // make sure the directory exists before we watch for changes. When we // do a recursive watch and perform rm -fr, the parent directory might @@ -345,6 +315,9 @@ func (w *Watcher) readEvents() { } } } + + // Move to next event + kevents = kevents[1:] } } } @@ -385,9 +358,9 @@ func (w *Watcher) watchDirectoryFiles(dirPath string) error { return err } - w.femut.Lock() + w.mu.Lock() w.fileExists[filePath] = true - w.femut.Unlock() + w.mu.Unlock() } return nil @@ -407,9 +380,9 @@ func (w *Watcher) sendDirectoryChangeEvents(dirPath string) { // Search for new files for _, fileInfo := range files { filePath := filepath.Join(dirPath, fileInfo.Name()) - w.femut.Lock() + w.mu.Lock() _, doesExist := w.fileExists[filePath] - w.femut.Unlock() + w.mu.Unlock() if !doesExist { // Send create event w.Events <- newCreateEvent(filePath) @@ -420,9 +393,9 @@ func (w *Watcher) sendDirectoryChangeEvents(dirPath string) { return } - w.femut.Lock() + w.mu.Lock() w.fileExists[filePath] = true - w.femut.Unlock() + w.mu.Unlock() } } @@ -430,9 +403,9 @@ func (w *Watcher) internalWatch(name string, fileInfo os.FileInfo) error { if fileInfo.IsDir() { // mimic Linux providing delete events for subdirectories // but preserve the flags used if currently watching subdirectory - w.dirmut.Lock() + w.mu.Lock() flags := w.dirFlags[name] - w.dirmut.Unlock() + w.mu.Unlock() flags |= syscall.NOTE_DELETE return w.addWatch(name, flags) @@ -469,7 +442,7 @@ func register(kq int, fds []int, flags int, fflags uint32) error { return nil } -// read retrieves pending events +// read retrieves pending events, or waits until an event occurs. // A timeout of nil blocks indefinitely, while 0 polls the queue. func read(kq int, events []syscall.Kevent_t, timeout *syscall.Timespec) ([]syscall.Kevent_t, error) { n, err := syscall.Kevent(kq, nil, events, timeout)