From c27ac98dbde3994409d2dd9f77701af24298e900 Mon Sep 17 00:00:00 2001 From: Kale Blankenship Date: Wed, 7 Feb 2018 20:48:13 -0800 Subject: [PATCH] Send End and wait for response on Session.Close() --- client.go | 76 +++++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 57 insertions(+), 19 deletions(-) diff --git a/client.go b/client.go index fedc989d..8d42a9bd 100644 --- a/client.go +++ b/client.go @@ -154,6 +154,12 @@ type Session struct { allocateHandle chan *link // link handles are allocated by sending a link on this channel, nil is sent on link.rx once allocated deallocateHandle chan *link // link handles are deallocated by sending a link on this channel + + // used for gracefully closing link + close chan struct{} + closeOnce sync.Once + done chan struct{} + err error } func newSession(c *conn, channel uint16) *Session { @@ -168,18 +174,16 @@ func newSession(c *conn, channel uint16) *Session { outgoingWindow: math.MaxUint32, allocateHandle: make(chan *link), deallocateHandle: make(chan *link), + close: make(chan struct{}), + done: make(chan struct{}), } } // Close closes the session. func (s *Session) Close() error { - // TODO: send end preformative (if Begin has been exchanged) - select { - case <-s.conn.done: - return s.conn.getErr() - case s.conn.delSession <- s: - return nil - } + s.closeOnce.Do(func() { close(s.close) }) + <-s.done + return s.err } // txFrame sends a frame to the connWriter @@ -315,8 +319,7 @@ func (s *Sender) Address() string { func (s *Sender) Close() error { // TODO: Should this timeout? Close() take a context? Use one of the // other timeouts? - s.link.Close() - return s.link.err + return s.link.Close() } // NewSender opens a new sender link on the session. @@ -330,6 +333,8 @@ func (s *Session) NewSender(opts ...LinkOption) (*Sender, error) { } func (s *Session) mux(remoteBegin *performBegin) { + defer close(s.done) + var ( links = make(map[uint32]*link) // mapping of remote handles to links linksByName = make(map[string]*link) // maping of names to links @@ -384,6 +389,34 @@ func (s *Session) mux(remoteBegin *performBegin) { select { // conn has completed, exit case <-s.conn.done: + s.err = s.conn.getErr() + return + + // session is being closed by user + case <-s.close: + s.txFrame(&performEnd{}, nil) + + // discard frames until End is received or conn closed + EndLoop: + for { + select { + case fr := <-s.rx: + _, ok := fr.body.(*performEnd) + if ok { + break EndLoop + } + case <-s.conn.done: + s.err = s.conn.getErr() + return + } + } + + // release session + select { + case s.conn.delSession <- s: + case <-s.conn.done: + s.err = s.conn.getErr() + } return // handle allocation request @@ -514,6 +547,11 @@ func (s *Session) mux(remoteBegin *performBegin) { case link.rx <- fr.body: } + case *performEnd: + s.txFrame(&performEnd{}, nil) + s.err = errorErrorf("session ended by server: %s", body.Error) + return + default: fmt.Printf("Unexpected frame: %s\n", body) } @@ -882,8 +920,8 @@ func (l *link) mux() { } case <-l.close: return - case <-l.session.conn.done: - l.err = l.session.conn.getErr() + case <-l.session.done: + l.err = l.session.err return } } @@ -908,8 +946,8 @@ func (l *link) mux() { } case <-l.close: return - case <-l.session.conn.done: - l.err = l.session.conn.getErr() + case <-l.session.done: + l.err = l.session.err return } } @@ -923,8 +961,8 @@ func (l *link) mux() { } case <-l.close: return - case <-l.session.conn.done: - l.err = l.session.conn.getErr() + case <-l.session.done: + l.err = l.session.err return } } @@ -933,9 +971,10 @@ func (l *link) mux() { // close closes and requests deletion of the link. // // No operations on link are valid after close. -func (l *link) Close() { +func (l *link) Close() error { l.closeOnce.Do(func() { close(l.close) }) <-l.done + return l.err } func (l *link) detach() { @@ -1228,8 +1267,7 @@ func (r *Receiver) Address() string { func (r *Receiver) Close() error { // TODO: Should this timeout? Close() take a context? Use one of the // other timeouts? - r.link.Close() - return r.link.err + return r.link.Close() } type messageDisposition struct { @@ -1312,7 +1350,7 @@ func (r *Receiver) dispositionBatcher() { batchStarted = false batchTimer.Stop() - case <-r.link.session.conn.done: // TODO: this should exit if link or session is closed + case <-r.link.done: return } }