Skip to content

并发回放实现

hueng edited this page Apr 29, 2020 · 8 revisions

通过 回放剧本传递 可知,mock库解决了并发度 = 1的回放问题。

为了提高RD和QA的测试效率,需要支持批量并发度 > 1的并发回放。

一、方案

通过sessionID关联Web Server、SUT Server、Mock Server,跨进程间传递,使Mock Server能够区分回放的流量。

二、时序图

时序图

三、主要流程

3.1、传递sessionID【Web Server -> SUT Server】

方案:由于这一步是通过http协议传输inbound请求,所以可以简单的通过加Header传递。

代码片段:

{
    s := bytes.Split(request, []byte("\r\n"))
    s[0] = append(s[0], []byte("\r\nSharingan-Replayer-Traceid: "+traceid)...)  
    request = bytes.Join(s, []byte("\r\n"))
}

3.2、维护映射关系【SUT Server】

方案:重写TCPConn.Read()逻辑,接受inbound请求时解析出sessionID,并保存goid和sessionID的映射关系。

依赖:为了保证inbound请求outbound请求时goid一致,必须使用定制版golang。详见:链路追踪原理

代码片段:

func MockTCPConnRead() {
    var c *net.TCPConn

    monkey.MockMemberFunc(reflect.TypeOf(c), "Read", func(conn *net.TCPConn, b []byte) (int, error) {
        traceID := ""

        // get header
        traceRegex := regexp.MustCompile(`Sharingan-Replayer-Traceid: (\w{32})\r\n`)
        if ss := traceRegex.FindAllSubmatch(b, -1); len(ss) >= 1 {
            traceID = string(ss[0][1])
        }

        // set globalThreads
        if traceID != "" {
            threadID := runtime.GetCurrentGoRoutineId()
            globalThreadsMutex.Lock()
            globalThreads[threadID] = &Thread{traceID: traceID}
            globalThreadsMutex.Unlock()
        }

        // 添加原生同功能函数,绕过monkey mock库并发问题
        return conn.Read2(b)
    })
}

3.3、添加流量标识【SUT Server -> Mock Server】

方案:重写TCPConn.Write()逻辑,对外outbound请求时根据goid取出sessionID,并添加sessionID流量标识。

代码片段:

func MockTCPConnWrite() {
    var c *net.TCPConn

    monkey.MockMemberFunc(reflect.TypeOf(c), "Write", func(conn *net.TCPConn, b []byte) (int, error) {
        traceID := ""

        // get traceID
        threadID := runtime.GetCurrentGoRoutineId()
        globalThreadsMutex.RLock()
        if thread := globalThreads[threadID]; thread != nil {
            traceID = thread.traceID
        }
        globalThreadsMutex.RUnlock()

        // 加流量标识, 示例:```/*{"sid":"1587446044518547700"}*/```
        prefix := fmt.Sprintf(trafficPrefix, traceID, remoteAddr)
        newb := append([]byte(prefix), b...)
        newn, err := conn.Write2(newb)

        return newn - len(prefix), err
    })
}

四、其它

1、为了支持单个流量并发回放,将流量的sessionID传递改成traceID。

2、Mock Server接受到的请求可能是分段传输的,需要全局替换流量标识。

3、部分outbound请求,会将inbound请求所有参数(包括header)传递,所以3.2这一步要去掉header。