Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OpenStream似乎有并行性能问题 #97

Open
WSitong opened this issue Feb 4, 2024 · 0 comments
Open

OpenStream似乎有并行性能问题 #97

WSitong opened this issue Feb 4, 2024 · 0 comments

Comments

@WSitong
Copy link

WSitong commented Feb 4, 2024

我在测试我代码发现OpenStream并行性能存在问题,甚至没有串行好。不知道是不是运行平台或设备的原因,我的电脑运行下面代码可以复现这个问题(并行用时12.8s,串行用时1.2s)。下面代码里会运行一个回显服务器,Test_Serial和Test_Parallel作为客户端,会先创建一个smux.session,并开1000个协程创建stream,然后发送和接受数据。唯一的区别就是Test_Serial在OpenStream时使用了锁:

package openstream_test

import (
	"github.com/xtaci/kcp-go/v5"
	"github.com/xtaci/smux"
	"log"
	"net"
	"sync"
	"testing"
	"time"
)

var listener net.Listener

func Test_Serial(t *testing.T) {
	// 测试串行性能
	if listener == nil {
		go runServer()
		time.Sleep(time.Second)
	}
	conn, err := kcp.DialWithOptions("127.0.0.1:8000", nil, 0, 0)
	if err != nil {
		t.Error(err)
	}
	sess, err := smux.Client(conn, smux.DefaultConfig())
	if err != nil {
		t.Error(err)
	}
	// 使用同一个smux.session,串行创建1000个stream发送数据
	data := []byte("this is s test message")
	wg := sync.WaitGroup{}
	mux := sync.Mutex{}
	for i := 0; i < 1000; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			mux.Lock()
			stream, err := sess.OpenStream()
			mux.Unlock()
			if err != nil {
				return
			}
			stream.Write(data)
			buffer := make([]byte, 1024)
			stream.Read(buffer)
		}()
	}
	wg.Wait()
}

func Test_Parallel(t *testing.T) {
	// 测试并行性能
	if listener == nil {
		go runServer()
		time.Sleep(time.Second)
	}
	conn, err := kcp.DialWithOptions("127.0.0.1:8000", nil, 0, 0)
	if err != nil {
		t.Error(err)
	}
	sess, err := smux.Client(conn, smux.DefaultConfig())
	if err != nil {
		t.Error(err)
	}
	// 使用同一个smux.session,并行创建1000个stream发送数据
	data := []byte("this is s test message")
	wg := sync.WaitGroup{}
	//mux := sync.Mutex{}
	for i := 0; i < 1000; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			//mux.Lock()
			stream, err := sess.OpenStream()
			//mux.Unlock()
			if err != nil {
				return
			}
			stream.Write(data)
			buffer := make([]byte, 1024)
			stream.Read(buffer)
		}()
	}
	wg.Wait()
}

func runServer() {
	// 回显服务器
	lis, err := kcp.ListenWithOptions(":8000", nil, 0, 0)
	if err != nil {
		log.Fatal(err)
	}
	listener = lis
	for {
		conn, err := lis.Accept()
		if err != nil {
			log.Fatal(err)
		}
		go func() {
			defer conn.Close()
			sess, err := smux.Server(conn, smux.DefaultConfig())
			if err != nil {
				log.Fatal(err)
			}
			for {
				stream, err := sess.AcceptStream()
				if err != nil {
					log.Fatal(err)
				}
				go func() {
					buffer := make([]byte, 1024)
					n, err := stream.Read(buffer)
					if err != nil {
						log.Fatal(err)
					}
					stream.Write(buffer[:n])
					time.Sleep(time.Minute)
				}()
			}
		}()
	}
}
@WSitong WSitong changed the title OpenStream似乎有并发性能问题 OpenStream似乎有并行性能问题 Feb 4, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant