Skip to content

Commit

Permalink
lint
Browse files Browse the repository at this point in the history
  • Loading branch information
xtaci committed Sep 7, 2016
1 parent f78c306 commit eacccc2
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 12 deletions.
3 changes: 1 addition & 2 deletions frame.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,14 @@ func (f *Frame) UnmarshalBinary(bts []byte) error {

// zeroCopyUnmarshal a byte slice into a frame,
// and just reference to the input slice
func (f *Frame) zeroCopyUnmarshal(bts []byte) error {
func (f *Frame) zeroCopyUnmarshal(bts []byte) {
f.ver = bts[0]
f.cmd = bts[1]
datalength := binary.LittleEndian.Uint16(bts[2:])
f.sid = binary.LittleEndian.Uint32(bts[4:])
if datalength > 0 {
f.data = bts[headerSize:]
}
return nil
}

type rawHeader []byte
Expand Down
14 changes: 8 additions & 6 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ const (

const (
errBrokenPipe = "broken pipe"
errConnReset = "connection reset by peer"
errInvalidProtocol = "invalid protocol version"
)

Expand Down Expand Up @@ -67,11 +68,13 @@ func (s *Session) OpenStream() (*Stream, error) {
sid := atomic.AddUint32(&s.nextStreamID, 2)
stream := newStream(sid, s.config.MaxFrameSize, s)

if _, err := s.writeFrame(newFrame(cmdSYN, sid)); err != nil {
return nil, errors.Wrap(err, "writeFrame")
}

s.streamLock.Lock()
s.streams[sid] = stream
s.streamLock.Unlock()

s.writeFrame(newFrame(cmdSYN, sid))
return stream, nil
}

Expand All @@ -87,24 +90,23 @@ func (s *Session) AcceptStream() (*Stream, error) {
}

// Close is used to close the session and all streams.
func (s *Session) Close() error {
func (s *Session) Close() (err error) {
s.dieLock.Lock()
defer s.dieLock.Unlock()

select {
case <-s.die:
return errors.New(errBrokenPipe)
default:
close(s.die)
s.streamLock.Lock()
for k := range s.streams {
s.streams[k].sessionClose()
}
s.streamLock.Unlock()
s.conn.Close()
close(s.die)
s.bucketCond.Signal()
return s.conn.Close()
}
return nil
}

// IsClosed does a safe check to see if we have shutdown
Expand Down
8 changes: 4 additions & 4 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ READ:
s.sess.returnTokens(n)
return n, nil
} else if atomic.LoadInt32(&s.rstflag) == 1 {
s.Close()
return 0, errors.New(errBrokenPipe)
_ = s.Close()
return 0, errors.New(errConnReset)
}

select {
Expand Down Expand Up @@ -94,9 +94,9 @@ func (s *Stream) Close() error {
default:
close(s.die)
s.sess.streamClosed(s.id)
s.sess.writeFrame(newFrame(cmdRST, s.id))
_, err := s.sess.writeFrame(newFrame(cmdRST, s.id))
return err
}
return nil
}

// session closes the stream
Expand Down

0 comments on commit eacccc2

Please sign in to comment.