Skip to content

Commit

Permalink
os: use poll.fdMutex for Plan 9 files
Browse files Browse the repository at this point in the history
This permits us to safely support concurrent access to files on Plan 9.
Concurrent access was already safe on other systems.

This does introduce a change: if one goroutine calls a blocking read
on a pipe, and another goroutine closes the pipe, then before this CL
the close would occur. Now the close will be delayed until the blocking
read completes.

Also add tests that concurrent I/O and Close on a pipe are OK.

For golang#50436
For golang#56043

Change-Id: I969c869ea3b8c5c2f2ef319e441a56a3c64e7bf5
Reviewed-on: https://go-review.googlesource.com/c/go/+/438347
Reviewed-by: Bryan Mills <[email protected]>
Reviewed-by: Ian Lance Taylor <[email protected]>
Reviewed-by: David du Colombier <[email protected]>
Run-TryBot: Ian Lance Taylor <[email protected]>
Auto-Submit: Ian Lance Taylor <[email protected]>
TryBot-Result: Gopher Robot <[email protected]>
Reviewed-by: Rob Pike <[email protected]>
  • Loading branch information
ianlancetaylor authored and romaindoumenc committed Nov 3, 2022
1 parent 85650a6 commit 2e29ae0
Show file tree
Hide file tree
Showing 8 changed files with 306 additions and 28 deletions.
12 changes: 6 additions & 6 deletions src/internal/poll/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,26 @@ package poll

var Consume = consume

type FDMutex struct {
type XFDMutex struct {
fdMutex
}

func (mu *FDMutex) Incref() bool {
func (mu *XFDMutex) Incref() bool {
return mu.incref()
}

func (mu *FDMutex) IncrefAndClose() bool {
func (mu *XFDMutex) IncrefAndClose() bool {
return mu.increfAndClose()
}

func (mu *FDMutex) Decref() bool {
func (mu *XFDMutex) Decref() bool {
return mu.decref()
}

func (mu *FDMutex) RWLock(read bool) bool {
func (mu *XFDMutex) RWLock(read bool) bool {
return mu.rwlock(read)
}

func (mu *FDMutex) RWUnlock(read bool) bool {
func (mu *XFDMutex) RWUnlock(read bool) bool {
return mu.rwunlock(read)
}
12 changes: 6 additions & 6 deletions src/internal/poll/fd_mutex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
)

func TestMutexLock(t *testing.T) {
var mu FDMutex
var mu XFDMutex

if !mu.Incref() {
t.Fatal("broken")
Expand All @@ -39,7 +39,7 @@ func TestMutexLock(t *testing.T) {
}

func TestMutexClose(t *testing.T) {
var mu FDMutex
var mu XFDMutex
if !mu.IncrefAndClose() {
t.Fatal("broken")
}
Expand All @@ -60,7 +60,7 @@ func TestMutexClose(t *testing.T) {

func TestMutexCloseUnblock(t *testing.T) {
c := make(chan bool, 4)
var mu FDMutex
var mu XFDMutex
mu.RWLock(true)
for i := 0; i < 4; i++ {
go func() {
Expand Down Expand Up @@ -104,7 +104,7 @@ func TestMutexPanic(t *testing.T) {
f()
}

var mu FDMutex
var mu XFDMutex
ensurePanics(func() { mu.Decref() })
ensurePanics(func() { mu.RWUnlock(true) })
ensurePanics(func() { mu.RWUnlock(false) })
Expand Down Expand Up @@ -137,7 +137,7 @@ func TestMutexOverflowPanic(t *testing.T) {
}
}()

var mu1 FDMutex
var mu1 XFDMutex
for i := 0; i < 1<<21; i++ {
mu1.Incref()
}
Expand All @@ -152,7 +152,7 @@ func TestMutexStress(t *testing.T) {
}
defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(P))
done := make(chan bool, P)
var mu FDMutex
var mu XFDMutex
var readState [2]uint64
var writeState [2]uint64
for p := 0; p < P; p++ {
Expand Down
42 changes: 42 additions & 0 deletions src/internal/poll/file_plan9.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright 2022 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package poll

// Expose fdMutex for use by the os package on Plan 9.
// On Plan 9 we don't want to use async I/O for file operations,
// but we still want the locking semantics that fdMutex provides.

// FDMutex is an exported fdMutex, only for Plan 9.
type FDMutex struct {
fdmu fdMutex
}

func (fdmu *FDMutex) Incref() bool {
return fdmu.fdmu.incref()
}

func (fdmu *FDMutex) Decref() bool {
return fdmu.fdmu.decref()
}

func (fdmu *FDMutex) IncrefAndClose() bool {
return fdmu.fdmu.increfAndClose()
}

func (fdmu *FDMutex) ReadLock() bool {
return fdmu.fdmu.rwlock(true)
}

func (fdmu *FDMutex) ReadUnlock() bool {
return fdmu.fdmu.rwunlock(true)
}

func (fdmu *FDMutex) WriteLock() bool {
return fdmu.fdmu.rwlock(false)
}

func (fdmu *FDMutex) WriteUnlock() bool {
return fdmu.fdmu.rwunlock(false)
}
70 changes: 70 additions & 0 deletions src/os/file_mutex_plan9.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright 2022 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package os

// File locking support for Plan 9. This uses fdMutex from the
// internal/poll package.

// incref adds a reference to the file. It returns an error if the file
// is already closed. This method is on File so that we can incorporate
// a nil test.
func (f *File) incref(op string) (err error) {
if f == nil {
return ErrInvalid
}
if !f.fdmu.Incref() {
err = ErrClosed
if op != "" {
err = &PathError{Op: op, Path: f.name, Err: err}
}
}
return err
}

// decref removes a reference to the file. If this is the last
// remaining reference, and the file has been marked to be closed,
// then actually close it.
func (file *file) decref() error {
if file.fdmu.Decref() {
return file.destroy()
}
return nil
}

// readLock adds a reference to the file and locks it for reading.
// It returns an error if the file is already closed.
func (file *file) readLock() error {
if !file.fdmu.ReadLock() {
return ErrClosed
}
return nil
}

// readUnlock removes a reference from the file and unlocks it for reading.
// It also closes the file if it marked as closed and there is no remaining
// reference.
func (file *file) readUnlock() {
if file.fdmu.ReadUnlock() {
file.destroy()
}
}

// writeLock adds a reference to the file and locks it for writing.
// It returns an error if the file is already closed.
func (file *file) writeLock() error {
if !file.fdmu.WriteLock() {
return ErrClosed
}
return nil
}

// writeUnlock removes a reference from the file and unlocks it for writing.
// It also closes the file if it is marked as closed and there is no remaining
// reference.
func (file *file) writeUnlock() {
if file.fdmu.WriteUnlock() {
file.destroy()
}
}
80 changes: 66 additions & 14 deletions src/os/file_plan9.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ func fixLongPath(path string) string {
// can overwrite this data, which could cause the finalizer
// to close the wrong file descriptor.
type file struct {
fdmu poll.FDMutex
fd int
name string
dirinfo *dirInfo // nil unless directory being read
Expand Down Expand Up @@ -142,24 +143,35 @@ func openFileNolog(name string, flag int, perm FileMode) (*File, error) {
// be canceled and return immediately with an ErrClosed error.
// Close will return an error if it has already been called.
func (f *File) Close() error {
if err := f.checkValid("close"); err != nil {
return err
if f == nil {
return ErrInvalid
}
return f.file.close()
}

func (file *file) close() error {
if file == nil || file.fd == badFd {
return ErrInvalid
if !file.fdmu.IncrefAndClose() {
return &PathError{Op: "close", Path: file.name, Err: ErrClosed}
}

// At this point we should cancel any pending I/O.
// How do we do that on Plan 9?

err := file.decref()

// no need for a finalizer anymore
runtime.SetFinalizer(file, nil)
return err
}

// destroy actually closes the descriptor. This is called when
// there are no remaining references, by the decref, readUnlock,
// and writeUnlock methods.
func (file *file) destroy() error {
var err error
if e := syscall.Close(file.fd); e != nil {
err = &PathError{Op: "close", Path: file.name, Err: e}
}
file.fd = badFd // so it can't be closed again

// no need for a finalizer anymore
runtime.SetFinalizer(file, nil)
return err
}

Expand Down Expand Up @@ -193,6 +205,12 @@ func (f *File) Truncate(size int64) error {
if err != nil {
return &PathError{Op: "truncate", Path: f.name, Err: err}
}

if err := f.incref("truncate"); err != nil {
return err
}
defer f.decref()

if err = syscall.Fwstat(f.fd, buf[:n]); err != nil {
return &PathError{Op: "truncate", Path: f.name, Err: err}
}
Expand All @@ -219,6 +237,12 @@ func (f *File) chmod(mode FileMode) error {
if err != nil {
return &PathError{Op: "chmod", Path: f.name, Err: err}
}

if err := f.incref("chmod"); err != nil {
return err
}
defer f.decref()

if err = syscall.Fwstat(f.fd, buf[:n]); err != nil {
return &PathError{Op: "chmod", Path: f.name, Err: err}
}
Expand All @@ -240,6 +264,12 @@ func (f *File) Sync() error {
if err != nil {
return &PathError{Op: "sync", Path: f.name, Err: err}
}

if err := f.incref("sync"); err != nil {
return err
}
defer f.decref()

if err = syscall.Fwstat(f.fd, buf[:n]); err != nil {
return &PathError{Op: "sync", Path: f.name, Err: err}
}
Expand All @@ -249,6 +279,10 @@ func (f *File) Sync() error {
// read reads up to len(b) bytes from the File.
// It returns the number of bytes read and an error, if any.
func (f *File) read(b []byte) (n int, err error) {
if err := f.readLock(); err != nil {
return 0, err
}
defer f.readUnlock()
n, e := fixCount(syscall.Read(f.fd, b))
if n == 0 && len(b) > 0 && e == nil {
return 0, io.EOF
Expand All @@ -260,6 +294,10 @@ func (f *File) read(b []byte) (n int, err error) {
// It returns the number of bytes read and the error, if any.
// EOF is signaled by a zero count with err set to nil.
func (f *File) pread(b []byte, off int64) (n int, err error) {
if err := f.readLock(); err != nil {
return 0, err
}
defer f.readUnlock()
n, e := fixCount(syscall.Pread(f.fd, b, off))
if n == 0 && len(b) > 0 && e == nil {
return 0, io.EOF
Expand All @@ -272,6 +310,10 @@ func (f *File) pread(b []byte, off int64) (n int, err error) {
// Since Plan 9 preserves message boundaries, never allow
// a zero-byte write.
func (f *File) write(b []byte) (n int, err error) {
if err := f.writeLock(); err != nil {
return 0, err
}
defer f.writeUnlock()
if len(b) == 0 {
return 0, nil
}
Expand All @@ -283,6 +325,10 @@ func (f *File) write(b []byte) (n int, err error) {
// Since Plan 9 preserves message boundaries, never allow
// a zero-byte write.
func (f *File) pwrite(b []byte, off int64) (n int, err error) {
if err := f.writeLock(); err != nil {
return 0, err
}
defer f.writeUnlock()
if len(b) == 0 {
return 0, nil
}
Expand All @@ -294,6 +340,10 @@ func (f *File) pwrite(b []byte, off int64) (n int, err error) {
// relative to the current offset, and 2 means relative to the end.
// It returns the new offset and an error, if any.
func (f *File) seek(offset int64, whence int) (ret int64, err error) {
if err := f.incref(""); err != nil {
return 0, err
}
defer f.decref()
if f.dirinfo != nil {
// Free cached dirinfo, so we allocate a new one if we
// access this file as a directory again. See #35767 and #37161.
Expand Down Expand Up @@ -493,9 +543,10 @@ func tempDir() string {
// which must be a directory.
// If there is an error, it will be of type *PathError.
func (f *File) Chdir() error {
if err := f.checkValid("chdir"); err != nil {
if err := f.incref("chdir"); err != nil {
return err
}
defer f.decref()
if e := syscall.Fchdir(f.fd); e != nil {
return &PathError{Op: "chdir", Path: f.name, Err: e}
}
Expand Down Expand Up @@ -526,16 +577,17 @@ func (f *File) setWriteDeadline(time.Time) error {
return poll.ErrNoDeadline
}

// checkValid checks whether f is valid for use.
// If not, it returns an appropriate error, perhaps incorporating the operation name op.
// checkValid checks whether f is valid for use, but does not prepare
// to actually use it. If f is not ready checkValid returns an appropriate
// error, perhaps incorporating the operation name op.
func (f *File) checkValid(op string) error {
if f == nil {
return ErrInvalid
}
if f.fd == badFd {
return &PathError{Op: op, Path: f.name, Err: ErrClosed}
if err := f.incref(op); err != nil {
return err
}
return nil
return f.decref()
}

type rawConn struct{}
Expand Down
Loading

0 comments on commit 2e29ae0

Please sign in to comment.