From 4fe1971b2dff1fa14cb8f5be47aed7fda76c0f7c Mon Sep 17 00:00:00 2001 From: Ian Lance Taylor Date: Tue, 4 Oct 2022 14:35:39 -0700 Subject: [PATCH] os: use poll.fdMutex for Plan 9 files This permits us to safely support concurrent access to files on Plan 9. Concurrent access was already safe on other systems. This does introduce a change: if one goroutine calls a blocking read on a pipe, and another goroutine closes the pipe, then before this CL the close would occur. Now the close will be delayed until the blocking read completes. Also add tests that concurrent I/O and Close on a pipe are OK. For #50436 For #56043 Change-Id: I969c869ea3b8c5c2f2ef319e441a56a3c64e7bf5 Reviewed-on: https://go-review.googlesource.com/c/go/+/438347 Reviewed-by: Bryan Mills Reviewed-by: Ian Lance Taylor Reviewed-by: David du Colombier <0intro@gmail.com> Run-TryBot: Ian Lance Taylor Auto-Submit: Ian Lance Taylor TryBot-Result: Gopher Robot Reviewed-by: Rob Pike --- src/internal/poll/export_test.go | 12 ++-- src/internal/poll/fd_mutex_test.go | 12 ++-- src/internal/poll/file_plan9.go | 42 +++++++++++ src/os/file_mutex_plan9.go | 70 ++++++++++++++++++ src/os/file_plan9.go | 80 +++++++++++++++++---- src/os/os_test.go | 112 +++++++++++++++++++++++++++++ src/os/stat_plan9.go | 4 ++ src/os/types_plan9.go | 2 - 8 files changed, 306 insertions(+), 28 deletions(-) create mode 100644 src/internal/poll/file_plan9.go create mode 100644 src/os/file_mutex_plan9.go diff --git a/src/internal/poll/export_test.go b/src/internal/poll/export_test.go index 02664d9ea3069..66d7c3274bed6 100644 --- a/src/internal/poll/export_test.go +++ b/src/internal/poll/export_test.go @@ -10,26 +10,26 @@ package poll var Consume = consume -type FDMutex struct { +type XFDMutex struct { fdMutex } -func (mu *FDMutex) Incref() bool { +func (mu *XFDMutex) Incref() bool { return mu.incref() } -func (mu *FDMutex) IncrefAndClose() bool { +func (mu *XFDMutex) IncrefAndClose() bool { return mu.increfAndClose() } -func (mu *FDMutex) Decref() bool { +func (mu *XFDMutex) Decref() bool { return mu.decref() } -func (mu *FDMutex) RWLock(read bool) bool { +func (mu *XFDMutex) RWLock(read bool) bool { return mu.rwlock(read) } -func (mu *FDMutex) RWUnlock(read bool) bool { +func (mu *XFDMutex) RWUnlock(read bool) bool { return mu.rwunlock(read) } diff --git a/src/internal/poll/fd_mutex_test.go b/src/internal/poll/fd_mutex_test.go index 3029b9a6811f1..62f953192d751 100644 --- a/src/internal/poll/fd_mutex_test.go +++ b/src/internal/poll/fd_mutex_test.go @@ -14,7 +14,7 @@ import ( ) func TestMutexLock(t *testing.T) { - var mu FDMutex + var mu XFDMutex if !mu.Incref() { t.Fatal("broken") @@ -39,7 +39,7 @@ func TestMutexLock(t *testing.T) { } func TestMutexClose(t *testing.T) { - var mu FDMutex + var mu XFDMutex if !mu.IncrefAndClose() { t.Fatal("broken") } @@ -60,7 +60,7 @@ func TestMutexClose(t *testing.T) { func TestMutexCloseUnblock(t *testing.T) { c := make(chan bool, 4) - var mu FDMutex + var mu XFDMutex mu.RWLock(true) for i := 0; i < 4; i++ { go func() { @@ -104,7 +104,7 @@ func TestMutexPanic(t *testing.T) { f() } - var mu FDMutex + var mu XFDMutex ensurePanics(func() { mu.Decref() }) ensurePanics(func() { mu.RWUnlock(true) }) ensurePanics(func() { mu.RWUnlock(false) }) @@ -137,7 +137,7 @@ func TestMutexOverflowPanic(t *testing.T) { } }() - var mu1 FDMutex + var mu1 XFDMutex for i := 0; i < 1<<21; i++ { mu1.Incref() } @@ -152,7 +152,7 @@ func TestMutexStress(t *testing.T) { } defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(P)) done := make(chan bool, P) - var mu FDMutex + var mu XFDMutex var readState [2]uint64 var writeState [2]uint64 for p := 0; p < P; p++ { diff --git a/src/internal/poll/file_plan9.go b/src/internal/poll/file_plan9.go new file mode 100644 index 0000000000000..57dc0c668f1d8 --- /dev/null +++ b/src/internal/poll/file_plan9.go @@ -0,0 +1,42 @@ +// Copyright 2022 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package poll + +// Expose fdMutex for use by the os package on Plan 9. +// On Plan 9 we don't want to use async I/O for file operations, +// but we still want the locking semantics that fdMutex provides. + +// FDMutex is an exported fdMutex, only for Plan 9. +type FDMutex struct { + fdmu fdMutex +} + +func (fdmu *FDMutex) Incref() bool { + return fdmu.fdmu.incref() +} + +func (fdmu *FDMutex) Decref() bool { + return fdmu.fdmu.decref() +} + +func (fdmu *FDMutex) IncrefAndClose() bool { + return fdmu.fdmu.increfAndClose() +} + +func (fdmu *FDMutex) ReadLock() bool { + return fdmu.fdmu.rwlock(true) +} + +func (fdmu *FDMutex) ReadUnlock() bool { + return fdmu.fdmu.rwunlock(true) +} + +func (fdmu *FDMutex) WriteLock() bool { + return fdmu.fdmu.rwlock(false) +} + +func (fdmu *FDMutex) WriteUnlock() bool { + return fdmu.fdmu.rwunlock(false) +} diff --git a/src/os/file_mutex_plan9.go b/src/os/file_mutex_plan9.go new file mode 100644 index 0000000000000..26bf5a7d1e5fc --- /dev/null +++ b/src/os/file_mutex_plan9.go @@ -0,0 +1,70 @@ +// Copyright 2022 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package os + +// File locking support for Plan 9. This uses fdMutex from the +// internal/poll package. + +// incref adds a reference to the file. It returns an error if the file +// is already closed. This method is on File so that we can incorporate +// a nil test. +func (f *File) incref(op string) (err error) { + if f == nil { + return ErrInvalid + } + if !f.fdmu.Incref() { + err = ErrClosed + if op != "" { + err = &PathError{Op: op, Path: f.name, Err: err} + } + } + return err +} + +// decref removes a reference to the file. If this is the last +// remaining reference, and the file has been marked to be closed, +// then actually close it. +func (file *file) decref() error { + if file.fdmu.Decref() { + return file.destroy() + } + return nil +} + +// readLock adds a reference to the file and locks it for reading. +// It returns an error if the file is already closed. +func (file *file) readLock() error { + if !file.fdmu.ReadLock() { + return ErrClosed + } + return nil +} + +// readUnlock removes a reference from the file and unlocks it for reading. +// It also closes the file if it marked as closed and there is no remaining +// reference. +func (file *file) readUnlock() { + if file.fdmu.ReadUnlock() { + file.destroy() + } +} + +// writeLock adds a reference to the file and locks it for writing. +// It returns an error if the file is already closed. +func (file *file) writeLock() error { + if !file.fdmu.WriteLock() { + return ErrClosed + } + return nil +} + +// writeUnlock removes a reference from the file and unlocks it for writing. +// It also closes the file if it is marked as closed and there is no remaining +// reference. +func (file *file) writeUnlock() { + if file.fdmu.WriteUnlock() { + file.destroy() + } +} diff --git a/src/os/file_plan9.go b/src/os/file_plan9.go index 93eb233e004a7..7a4a562783ee1 100644 --- a/src/os/file_plan9.go +++ b/src/os/file_plan9.go @@ -22,6 +22,7 @@ func fixLongPath(path string) string { // can overwrite this data, which could cause the finalizer // to close the wrong file descriptor. type file struct { + fdmu poll.FDMutex fd int name string dirinfo *dirInfo // nil unless directory being read @@ -142,24 +143,35 @@ func openFileNolog(name string, flag int, perm FileMode) (*File, error) { // be canceled and return immediately with an ErrClosed error. // Close will return an error if it has already been called. func (f *File) Close() error { - if err := f.checkValid("close"); err != nil { - return err + if f == nil { + return ErrInvalid } return f.file.close() } func (file *file) close() error { - if file == nil || file.fd == badFd { - return ErrInvalid + if !file.fdmu.IncrefAndClose() { + return &PathError{Op: "close", Path: file.name, Err: ErrClosed} } + + // At this point we should cancel any pending I/O. + // How do we do that on Plan 9? + + err := file.decref() + + // no need for a finalizer anymore + runtime.SetFinalizer(file, nil) + return err +} + +// destroy actually closes the descriptor. This is called when +// there are no remaining references, by the decref, readUnlock, +// and writeUnlock methods. +func (file *file) destroy() error { var err error if e := syscall.Close(file.fd); e != nil { err = &PathError{Op: "close", Path: file.name, Err: e} } - file.fd = badFd // so it can't be closed again - - // no need for a finalizer anymore - runtime.SetFinalizer(file, nil) return err } @@ -193,6 +205,12 @@ func (f *File) Truncate(size int64) error { if err != nil { return &PathError{Op: "truncate", Path: f.name, Err: err} } + + if err := f.incref("truncate"); err != nil { + return err + } + defer f.decref() + if err = syscall.Fwstat(f.fd, buf[:n]); err != nil { return &PathError{Op: "truncate", Path: f.name, Err: err} } @@ -219,6 +237,12 @@ func (f *File) chmod(mode FileMode) error { if err != nil { return &PathError{Op: "chmod", Path: f.name, Err: err} } + + if err := f.incref("chmod"); err != nil { + return err + } + defer f.decref() + if err = syscall.Fwstat(f.fd, buf[:n]); err != nil { return &PathError{Op: "chmod", Path: f.name, Err: err} } @@ -240,6 +264,12 @@ func (f *File) Sync() error { if err != nil { return &PathError{Op: "sync", Path: f.name, Err: err} } + + if err := f.incref("sync"); err != nil { + return err + } + defer f.decref() + if err = syscall.Fwstat(f.fd, buf[:n]); err != nil { return &PathError{Op: "sync", Path: f.name, Err: err} } @@ -249,6 +279,10 @@ func (f *File) Sync() error { // read reads up to len(b) bytes from the File. // It returns the number of bytes read and an error, if any. func (f *File) read(b []byte) (n int, err error) { + if err := f.readLock(); err != nil { + return 0, err + } + defer f.readUnlock() n, e := fixCount(syscall.Read(f.fd, b)) if n == 0 && len(b) > 0 && e == nil { return 0, io.EOF @@ -260,6 +294,10 @@ func (f *File) read(b []byte) (n int, err error) { // It returns the number of bytes read and the error, if any. // EOF is signaled by a zero count with err set to nil. func (f *File) pread(b []byte, off int64) (n int, err error) { + if err := f.readLock(); err != nil { + return 0, err + } + defer f.readUnlock() n, e := fixCount(syscall.Pread(f.fd, b, off)) if n == 0 && len(b) > 0 && e == nil { return 0, io.EOF @@ -272,6 +310,10 @@ func (f *File) pread(b []byte, off int64) (n int, err error) { // Since Plan 9 preserves message boundaries, never allow // a zero-byte write. func (f *File) write(b []byte) (n int, err error) { + if err := f.writeLock(); err != nil { + return 0, err + } + defer f.writeUnlock() if len(b) == 0 { return 0, nil } @@ -283,6 +325,10 @@ func (f *File) write(b []byte) (n int, err error) { // Since Plan 9 preserves message boundaries, never allow // a zero-byte write. func (f *File) pwrite(b []byte, off int64) (n int, err error) { + if err := f.writeLock(); err != nil { + return 0, err + } + defer f.writeUnlock() if len(b) == 0 { return 0, nil } @@ -294,6 +340,10 @@ func (f *File) pwrite(b []byte, off int64) (n int, err error) { // relative to the current offset, and 2 means relative to the end. // It returns the new offset and an error, if any. func (f *File) seek(offset int64, whence int) (ret int64, err error) { + if err := f.incref(""); err != nil { + return 0, err + } + defer f.decref() if f.dirinfo != nil { // Free cached dirinfo, so we allocate a new one if we // access this file as a directory again. See #35767 and #37161. @@ -493,9 +543,10 @@ func tempDir() string { // which must be a directory. // If there is an error, it will be of type *PathError. func (f *File) Chdir() error { - if err := f.checkValid("chdir"); err != nil { + if err := f.incref("chdir"); err != nil { return err } + defer f.decref() if e := syscall.Fchdir(f.fd); e != nil { return &PathError{Op: "chdir", Path: f.name, Err: e} } @@ -526,16 +577,17 @@ func (f *File) setWriteDeadline(time.Time) error { return poll.ErrNoDeadline } -// checkValid checks whether f is valid for use. -// If not, it returns an appropriate error, perhaps incorporating the operation name op. +// checkValid checks whether f is valid for use, but does not prepare +// to actually use it. If f is not ready checkValid returns an appropriate +// error, perhaps incorporating the operation name op. func (f *File) checkValid(op string) error { if f == nil { return ErrInvalid } - if f.fd == badFd { - return &PathError{Op: op, Path: f.name, Err: ErrClosed} + if err := f.incref(op); err != nil { + return err } - return nil + return f.decref() } type rawConn struct{} diff --git a/src/os/os_test.go b/src/os/os_test.go index ff74598362027..550b7db5a3327 100644 --- a/src/os/os_test.go +++ b/src/os/os_test.go @@ -2799,3 +2799,115 @@ func TestWriteStringAlloc(t *testing.T) { t.Errorf("expected 0 allocs for File.WriteString, got %v", allocs) } } + +// Test that it's OK to have parallel I/O and Close on a pipe. +func TestPipeIOCloseRace(t *testing.T) { + // Skip on wasm, which doesn't have pipes. + if runtime.GOOS == "js" { + t.Skip("skipping on js: no pipes") + } + + r, w, err := Pipe() + if err != nil { + t.Fatal(err) + } + + var wg sync.WaitGroup + wg.Add(3) + + go func() { + defer wg.Done() + for { + n, err := w.Write([]byte("hi")) + if err != nil { + // We look at error strings as the + // expected errors are OS-specific. + switch { + case errors.Is(err, ErrClosed), + strings.Contains(err.Error(), "broken pipe"), + strings.Contains(err.Error(), "pipe is being closed"), + strings.Contains(err.Error(), "hungup channel"): + // Ignore an expected error. + default: + // Unexpected error. + t.Error(err) + } + return + } + if n != 2 { + t.Errorf("wrote %d bytes, expected 2", n) + return + } + } + }() + + go func() { + defer wg.Done() + for { + var buf [2]byte + n, err := r.Read(buf[:]) + if err != nil { + if err != io.EOF && !errors.Is(err, ErrClosed) { + t.Error(err) + } + return + } + if n != 2 { + t.Errorf("read %d bytes, want 2", n) + } + } + }() + + go func() { + defer wg.Done() + + // Let the other goroutines start. This is just to get + // a better test, the test will still pass if they + // don't start. + time.Sleep(time.Millisecond) + + if err := r.Close(); err != nil { + t.Error(err) + } + if err := w.Close(); err != nil { + t.Error(err) + } + }() + + wg.Wait() +} + +// Test that it's OK to call Close concurrently on a pipe. +func TestPipeCloseRace(t *testing.T) { + // Skip on wasm, which doesn't have pipes. + if runtime.GOOS == "js" { + t.Skip("skipping on js: no pipes") + } + + r, w, err := Pipe() + if err != nil { + t.Fatal(err) + } + var wg sync.WaitGroup + c := make(chan error, 4) + f := func() { + defer wg.Done() + c <- r.Close() + c <- w.Close() + } + wg.Add(2) + go f() + go f() + nils, errs := 0, 0 + for i := 0; i < 4; i++ { + err := <-c + if err == nil { + nils++ + } else { + errs++ + } + } + if nils != 2 || errs != 2 { + t.Errorf("got nils %d errs %d, want 2 2", nils, errs) + } +} diff --git a/src/os/stat_plan9.go b/src/os/stat_plan9.go index e20accf191321..a5e9901379aea 100644 --- a/src/os/stat_plan9.go +++ b/src/os/stat_plan9.go @@ -56,7 +56,11 @@ func dirstat(arg any) (*syscall.Dir, error) { switch a := arg.(type) { case *File: name = a.name + if err := a.incref("fstat"); err != nil { + return nil, err + } n, err = syscall.Fstat(a.fd, buf) + a.decref() case string: name = a n, err = syscall.Stat(a, buf) diff --git a/src/os/types_plan9.go b/src/os/types_plan9.go index ccf4fd932e7a5..adb40130855a8 100644 --- a/src/os/types_plan9.go +++ b/src/os/types_plan9.go @@ -28,5 +28,3 @@ func sameFile(fs1, fs2 *fileStat) bool { b := fs2.sys.(*syscall.Dir) return a.Qid.Path == b.Qid.Path && a.Type == b.Type && a.Dev == b.Dev } - -const badFd = -1