Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add CancelWithErr mechanism #11

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions djherbis-stream.iml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="WEB_MODULE" version="4">
<component name="Go" enabled="true" />
<component name="NewModuleRootManager" inherit-compiler-output="true">
<exclude-output />
<content url="file://$MODULE_DIR$" />
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>
6 changes: 4 additions & 2 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,12 @@ func (r *Reader) read(p []byte, off *int64) (n int, err error) {
}

func (r *Reader) checkErr(err error) error {
switch err {
case ErrCanceled:
switch {
case err == ErrCanceled,
err != nil && err == r.s.b.err:
r.Close()
}

return err
}

Expand Down
7 changes: 7 additions & 0 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,13 @@ func (s *Stream) Cancel() error {
return s.Close() // all writes are stopped
}

// CancelWithErr works like Stream.Cancel, but permits a custom error
// to be returned.
func (s *Stream) CancelWithErr(err error) error {
s.b.CancelWithErr(err) // all existing reads are canceled, no new reads will occur, all readers closed
return s.Close() // all writes are stopped
}

// NextReader will return a concurrent-safe Reader for this stream. Each Reader will
// see a complete and independent view of the stream, and can Read while the stream
// is written to.
Expand Down
110 changes: 110 additions & 0 deletions stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,67 @@ func testCancelBeforeClose(t *testing.T, fs FileSystem) {
cleanup(f, t)
}

func TestCancelWithErrBeforeClose(t *testing.T) {
for _, fs := range GetFilesystems() {
testCancelWithErrBeforeClose(t, fs)
}
}

func testCancelWithErrBeforeClose(t *testing.T, fs FileSystem) {
wantErr := errors.New("oh dear")
f, err := NewStream(t.Name()+".txt", fs)
if err != nil {
t.Error(err)
t.FailNow()
}
f.Write([]byte("Hello"))
r, err := f.NextReader() // blocking reader
if err != nil {
t.Error("error creating new reader: ", err)
}

wg := sync.WaitGroup{}
wg.Add(1)
go func() {
_, err := ioutil.ReadAll(r)
if err != wantErr {
t.Error("Read after cancel should return an error")
}
wg.Done()
}()
<-time.After(50 * time.Millisecond) // give Reader time to block, this tests it unblocks

// When canceling writer, reader is closed, so writer unblocks and test passes
f.CancelWithErr(wantErr)
// Double cancel should not affect the outcome
f.CancelWithErr(wantErr)
// Close after cancel should not affect the outcome
f.Close()

// ReadAt after cancel
_, err = ioutil.ReadAll(io.NewSectionReader(r, 0, 1))
if err != wantErr {
t.Error("ReadAt after cancel should return an error")
}

// NextReader should fail as well
_, err = f.NextReader()
if err != wantErr {
t.Error("NextReader should be canceled, but got: ", err)
}

n, err := f.Write([]byte("world"))
// Writer is closed as well
if err == nil {
t.Error("expected write after canceling to fail")
}
if n != 0 {
t.Error("expected write after canceling to not write anything")
}
wg.Wait()
cleanup(f, t)
}

func TestCancelAfterClose(t *testing.T) {
for _, fs := range GetFilesystems() {
testCancelAfterClose(t, fs)
Expand Down Expand Up @@ -369,6 +430,55 @@ func testCancelAfterClose(t *testing.T, fs FileSystem) {
cleanup(f, t)
}

func TestCancelWithErrAfterClose(t *testing.T) {
for _, fs := range GetFilesystems() {
testCancelWithErrAfterClose(t, fs)
}
}

func testCancelWithErrAfterClose(t *testing.T, fs FileSystem) {
wantErr := errors.New("oh dear")
f, err := NewStream(t.Name()+".txt", fs)
if err != nil {
t.Error(err)
t.FailNow()
}

r, _ := f.NextReader()

wg := sync.WaitGroup{}

wg.Add(2)

f.Write([]byte("Hello"))
f.Close()

// This unblocks and cancels any future reads
f.CancelWithErr(wantErr)

go func() {
time.Sleep(50 * time.Millisecond)
_, err := f.NextReader()
if err != wantErr {
t.Error("Opening new reader after canceling should fail")
}
wg.Done()
}()

go func() {
time.Sleep(50 * time.Millisecond)
_, err := ioutil.ReadAll(r)
if err != wantErr {
t.Error("If canceling after closing, already opened readers should finish")
}
wg.Done()
}()

wg.Wait()

cleanup(f, t)
}

func TestShutdownAfterClose(t *testing.T) {
for _, fs := range GetFilesystems() {
testShutdownAfterClose(t, fs)
Expand Down
45 changes: 33 additions & 12 deletions sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ const (
)

type broadcaster struct {
mu sync.RWMutex
cond *sync.Cond
state streamState
size int64
newHandleErr error
rs *readerSet
fileInUse sync.WaitGroup
mu sync.RWMutex
cond *sync.Cond
state streamState
size int64
err error
rs *readerSet
fileInUse sync.WaitGroup
}

func newBroadcaster() *broadcaster {
Expand All @@ -50,6 +50,9 @@ func (b *broadcaster) Wait(r *Reader, off int64) error {

switch b.state {
case canceledState:
if b.err != nil {
return b.err
}
return ErrCanceled

case closedState:
Expand Down Expand Up @@ -97,15 +100,29 @@ func (b *broadcaster) Cancel() (err error) {
return nil
}

func (b *broadcaster) CancelWithErr(cancelErr error) (err error) {
b.mu.Lock()
b.setState(canceledState)
b.preventNewHandles(cancelErr)
readersToClose := b.rs.dropAll()
b.mu.Unlock()

for _, r := range readersToClose {
r.Close()
}

return nil
}

func (b *broadcaster) PreventNewHandles(err error) {
b.mu.Lock()
b.preventNewHandles(err)
b.mu.Unlock()
}

func (b *broadcaster) preventNewHandles(err error) {
if b.newHandleErr == nil {
b.newHandleErr = err
if b.err == nil {
b.err = err
}
}

Expand All @@ -118,7 +135,11 @@ func (b *broadcaster) UseHandle(do func() (int, error)) (int, error) {
switch b.state {
case canceledState:
b.mu.RUnlock()
return 0, ErrCanceled
err := b.err
if err == nil {
err = ErrCanceled
}
return 0, err
}
b.mu.RUnlock()

Expand Down Expand Up @@ -149,8 +170,8 @@ func (b *broadcaster) Size() (size int64, isClosed bool) {
func (b *broadcaster) addHandle() error {
b.mu.RLock()
defer b.mu.RUnlock()
if b.newHandleErr != nil {
return b.newHandleErr
if b.err != nil {
return b.err
}

b.fileInUse.Add(1)
Expand Down