Skip to content
This repository has been archived by the owner on Dec 14, 2020. It is now read-only.

Commit

Permalink
Send End and wait for response on Session.Close()
Browse files Browse the repository at this point in the history
  • Loading branch information
vcabbage committed Feb 11, 2018
1 parent c6dead2 commit c27ac98
Showing 1 changed file with 57 additions and 19 deletions.
76 changes: 57 additions & 19 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,12 @@ type Session struct {

allocateHandle chan *link // link handles are allocated by sending a link on this channel, nil is sent on link.rx once allocated
deallocateHandle chan *link // link handles are deallocated by sending a link on this channel

// used for gracefully closing link
close chan struct{}
closeOnce sync.Once
done chan struct{}
err error
}

func newSession(c *conn, channel uint16) *Session {
Expand All @@ -168,18 +174,16 @@ func newSession(c *conn, channel uint16) *Session {
outgoingWindow: math.MaxUint32,
allocateHandle: make(chan *link),
deallocateHandle: make(chan *link),
close: make(chan struct{}),
done: make(chan struct{}),
}
}

// Close closes the session.
func (s *Session) Close() error {
// TODO: send end preformative (if Begin has been exchanged)
select {
case <-s.conn.done:
return s.conn.getErr()
case s.conn.delSession <- s:
return nil
}
s.closeOnce.Do(func() { close(s.close) })
<-s.done
return s.err
}

// txFrame sends a frame to the connWriter
Expand Down Expand Up @@ -315,8 +319,7 @@ func (s *Sender) Address() string {
func (s *Sender) Close() error {
// TODO: Should this timeout? Close() take a context? Use one of the
// other timeouts?
s.link.Close()
return s.link.err
return s.link.Close()
}

// NewSender opens a new sender link on the session.
Expand All @@ -330,6 +333,8 @@ func (s *Session) NewSender(opts ...LinkOption) (*Sender, error) {
}

func (s *Session) mux(remoteBegin *performBegin) {
defer close(s.done)

var (
links = make(map[uint32]*link) // mapping of remote handles to links
linksByName = make(map[string]*link) // maping of names to links
Expand Down Expand Up @@ -384,6 +389,34 @@ func (s *Session) mux(remoteBegin *performBegin) {
select {
// conn has completed, exit
case <-s.conn.done:
s.err = s.conn.getErr()
return

// session is being closed by user
case <-s.close:
s.txFrame(&performEnd{}, nil)

// discard frames until End is received or conn closed
EndLoop:
for {
select {
case fr := <-s.rx:
_, ok := fr.body.(*performEnd)
if ok {
break EndLoop
}
case <-s.conn.done:
s.err = s.conn.getErr()
return
}
}

// release session
select {
case s.conn.delSession <- s:
case <-s.conn.done:
s.err = s.conn.getErr()
}
return

// handle allocation request
Expand Down Expand Up @@ -514,6 +547,11 @@ func (s *Session) mux(remoteBegin *performBegin) {
case link.rx <- fr.body:
}

case *performEnd:
s.txFrame(&performEnd{}, nil)
s.err = errorErrorf("session ended by server: %s", body.Error)
return

default:
fmt.Printf("Unexpected frame: %s\n", body)
}
Expand Down Expand Up @@ -882,8 +920,8 @@ func (l *link) mux() {
}
case <-l.close:
return
case <-l.session.conn.done:
l.err = l.session.conn.getErr()
case <-l.session.done:
l.err = l.session.err
return
}
}
Expand All @@ -908,8 +946,8 @@ func (l *link) mux() {
}
case <-l.close:
return
case <-l.session.conn.done:
l.err = l.session.conn.getErr()
case <-l.session.done:
l.err = l.session.err
return
}
}
Expand All @@ -923,8 +961,8 @@ func (l *link) mux() {
}
case <-l.close:
return
case <-l.session.conn.done:
l.err = l.session.conn.getErr()
case <-l.session.done:
l.err = l.session.err
return
}
}
Expand All @@ -933,9 +971,10 @@ func (l *link) mux() {
// close closes and requests deletion of the link.
//
// No operations on link are valid after close.
func (l *link) Close() {
func (l *link) Close() error {
l.closeOnce.Do(func() { close(l.close) })
<-l.done
return l.err
}

func (l *link) detach() {
Expand Down Expand Up @@ -1228,8 +1267,7 @@ func (r *Receiver) Address() string {
func (r *Receiver) Close() error {
// TODO: Should this timeout? Close() take a context? Use one of the
// other timeouts?
r.link.Close()
return r.link.err
return r.link.Close()
}

type messageDisposition struct {
Expand Down Expand Up @@ -1312,7 +1350,7 @@ func (r *Receiver) dispositionBatcher() {
batchStarted = false
batchTimer.Stop()

case <-r.link.session.conn.done: // TODO: this should exit if link or session is closed
case <-r.link.done:
return
}
}
Expand Down

0 comments on commit c27ac98

Please sign in to comment.