Skip to content

Commit

Permalink
*: make conn packet more stable
Browse files Browse the repository at this point in the history
  • Loading branch information
nange committed Feb 23, 2020
1 parent 87a486f commit 51ab713
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 14 deletions.
60 changes: 50 additions & 10 deletions cipherstream/cipherstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,16 @@ import (
log "github.com/sirupsen/logrus"
)

// MAX_PAYLOAD_SIZE is the maximum size of payload, set to 16KB.
const MAX_PAYLOAD_SIZE = 1<<14 - 1
const (
// MAX_PAYLOAD_SIZE is the maximum size of payload, set to 16KB.
MaxPayloadSize = 1<<14 - 1

const FRAME_HEADER_SIZE = 9
// FrameHeaderSize is the http2 frame header size
FrameHeaderSize = 9

// PaddingSize is the http2 payload padding size
PaddingSize = 256
)

type CipherStream struct {
io.ReadWriteCloser
Expand Down Expand Up @@ -52,8 +58,8 @@ func New(stream io.ReadWriteCloser, password, method string) (io.ReadWriteCloser
return nil, errors.New("cipher method unsupported, method:" + method)
}

cs.reader.rbuf = make([]byte, MAX_PAYLOAD_SIZE+cs.NonceSize()+cs.Overhead())
cs.writer.wbuf = make([]byte, MAX_PAYLOAD_SIZE+cs.NonceSize()+cs.Overhead())
cs.reader.rbuf = make([]byte, MaxPayloadSize+cs.NonceSize()+cs.Overhead())
cs.writer.wbuf = make([]byte, MaxPayloadSize+cs.NonceSize()+cs.Overhead())

return cs, nil
}
Expand All @@ -70,17 +76,29 @@ var dataHeaderPool = sync.Pool{
},
}

var paddingPool = sync.Pool{
New: func() interface{} {
buf := make([]byte, PaddingSize)
return buf
},
}

func (cs *CipherStream) ReadFrom(r io.Reader) (n int64, err error) {
for {
payloadBuf := cs.wbuf[:MAX_PAYLOAD_SIZE]
payloadBuf := cs.wbuf[:MaxPayloadSize]
nr, er := r.Read(payloadBuf)

if nr > 0 {
n += int64(nr)
log.Debugf("read from normal stream, frame payload size:%v ", nr)

var padding bool
headerBuf := dataHeaderPool.Get().([]byte)
headerBuf = util.EncodeHTTP2DataFrameHeader(nr, headerBuf)
if headerBuf[4] == 0x1 {
padding = true
}

headercipher, er := cs.Encrypt(headerBuf)
dataHeaderPool.Put(headerBuf)
if er != nil {
Expand All @@ -95,6 +113,17 @@ func (cs *CipherStream) ReadFrom(r io.Reader) (n int64, err error) {
}

dataframe := append(headercipher, payloadcipher...)
if padding {
padBytes := paddingPool.Get().([]byte)
padcipher, err := cs.Encrypt(padBytes)
paddingPool.Put(padBytes)
if err != nil {
log.Errorf("encrypt padding buf err:%+v", err)
return 0, ErrEncrypt
}
dataframe = append(dataframe, padcipher...)
}

if _, ew := cs.ReadWriteCloser.Write(dataframe); ew != nil {
log.Warnf("write cipher data to cipher stream failed, msg:%+v", ew)
if timeout(ew) {
Expand Down Expand Up @@ -136,14 +165,14 @@ func (cs *CipherStream) Read(b []byte) (int, error) {

cn := copy(b, payloadplain)
if cn < len(payloadplain) {
cs.leftover = payloadplain[cn:len(payloadplain)]
cs.leftover = payloadplain[cn:]
}

return cn, nil
}

func (cs *CipherStream) read() ([]byte, error) {
hbuf := cs.rbuf[:FRAME_HEADER_SIZE+cs.NonceSize()+cs.Overhead()]
hbuf := cs.rbuf[:FrameHeaderSize+cs.NonceSize()+cs.Overhead()]
if _, err := io.ReadFull(cs.ReadWriteCloser, hbuf); err != nil {
log.Warnf("read cipher stream payload len err:%+v", err)
if timeout(err) {
Expand All @@ -160,7 +189,7 @@ func (cs *CipherStream) read() ([]byte, error) {

size := int(hplain[0])<<16 | int(hplain[1])<<8 | int(hplain[2])
log.Debugf("read from cipher stream, frame payload size:%v", size)
if (size & MAX_PAYLOAD_SIZE) != size {
if (size & MaxPayloadSize) != size {
log.Errorf("read from cipherstream payload size:%+v is invalid", size)
return nil, ErrPayloadSize
}
Expand All @@ -185,12 +214,23 @@ func (cs *CipherStream) read() ([]byte, error) {
return nil, err
}

if hplain[4] == 0x1 { // has padding field
lenpadding := PaddingSize + cs.NonceSize() + cs.Overhead()
if _, err := io.ReadFull(cs.ReadWriteCloser, cs.rbuf[:lenpadding]); err != nil {
log.Warnf("read cipher stream payload padding err:%+v, lenpadding:%v", err, lenpadding)
if timeout(err) {
return nil, ErrTimeout
}
return nil, ErrReadCipher
}
}

return payloadplain, nil
}

// rstStream check the payload is RST_STREAM
func rstStream(payload []byte) (bool, error) {
if len(payload) != FRAME_HEADER_SIZE {
if len(payload) != FrameHeaderSize {
return false, nil
}
size := int(payload[0])<<16 | int(payload[1])<<8 | int(payload[2])
Expand Down
2 changes: 1 addition & 1 deletion easyss.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (ss *Easyss) Close() {
func (ss *Easyss) printStatistics() {
for {
select {
case <-time.After(5 * time.Second):
case <-time.After(15 * time.Second):
sendSize := atomic.LoadInt64(&ss.stat.BytesSend) / (1024 * 1024)
reciveSize := atomic.LoadInt64(&ss.stat.BytesRecive) / (1024 * 1024)
log.Infof("easyss send data size: %vMB, recive data size: %vMB", sendSize, reciveSize)
Expand Down
19 changes: 19 additions & 0 deletions local.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,13 @@ func (ss *Easyss) UDPHandle(s *socks5.Server, addr *net.UDPAddr, d *socks5.Datag
return ss.handle.UDPHandle(s, addr, d)
}

var paddingPool = sync.Pool{
New: func() interface{} {
buf := make([]byte, cipherstream.PaddingSize)
return buf
},
}

func (ss *Easyss) localRelay(localConn net.Conn, addr string) (err error) {
var stream io.ReadWriteCloser
stream, err = ss.tcpPool.Get()
Expand Down Expand Up @@ -123,6 +130,18 @@ func (ss *Easyss) localRelay(localConn net.Conn, addr string) (err error) {
}

handshake := append(headercipher, payloadcipher...)
if header[4] == 0x1 { // has padding field
padBytes := paddingPool.Get().([]byte)
defer paddingPool.Put(padBytes)

var padcipher []byte
padcipher, err = gcm.Encrypt(padBytes)
if err != nil {
log.Errorf("encrypt padding buf err:%+v", err)
return
}
handshake = append(handshake, padcipher...)
}
_, err = stream.Write(handshake)
if err != nil {
log.Errorf("stream.Write err:%+v", errors.WithStack(err))
Expand Down
21 changes: 18 additions & 3 deletions remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,8 @@ func handShake(stream io.ReadWriter, password string) (addr []byte, ciphermethod
}

payloadlen := int(headerplain[0])<<16 | int(headerplain[1])<<8 | int(headerplain[2])
if headerplain[3] != 0x0 || headerplain[4] != 0x0 {
err = errors.New(fmt.Sprintf("http2 data frame type:%v is invalid or flag:%v is invalid, both should be 0x0",
headerplain[3], headerplain[4]))
if headerplain[3] != 0x0 {
err = errors.New(fmt.Sprintf("http2 data frame type:%v is invalid, should be 0x0", headerplain[3]))
return
}

Expand All @@ -142,6 +141,22 @@ func handShake(stream io.ReadWriter, password string) (addr []byte, ciphermethod
}
ciphermethod = DecodeCipherMethod(payloadplain[length-1])

if headerplain[4] == 0x1 { // has padding field
paddingbuf := remoteBytespool.Get(cipherstream.PaddingSize + gcm.NonceSize() + gcm.Overhead())
defer remoteBytespool.Put(paddingbuf)

if _, err = io.ReadFull(stream, paddingbuf); err != nil {
err = errors.WithStack(err)
log.Warnf("io.ReadFull read paddingbuf err:%+v, len:%v", err, len(paddingbuf))
return
}
_, err = gcm.Decrypt(paddingbuf)
if err != nil {
log.Errorf("gcm.Decrypt decrypt paddingbuf:%v, err:%+v", paddingbuf, err)
return
}
}

return payloadplain[:length-1], ciphermethod, nil
}

Expand Down
6 changes: 6 additions & 0 deletions util/http2frame.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"encoding/binary"
"math/rand"
"sync"

log "github.com/sirupsen/logrus"
)

var lenPool = sync.Pool{
Expand All @@ -30,6 +32,10 @@ func EncodeHTTP2DataFrameHeader(datalen int, dst []byte) (header []byte) {
dst[3] = 0x0
// set default flag
dst[4] = 0x0
if datalen < 512 { // data has padding field
log.Debugf("data payload size:%v, less than 512 bytes, we add padding field", datalen)
dst[4] = 0x1
}

// set stream identifier. note: this is temporary, will update in future
binary.BigEndian.PutUint32(dst[5:9], uint32(rand.Int31()))
Expand Down

0 comments on commit 51ab713

Please sign in to comment.