Skip to content

Commit

Permalink
[improvement] proxy: filter duplicate set stmts. (#20829)
Browse files Browse the repository at this point in the history
filter duplicate set stmts in proxy.

Approved by: @zhangxu19830126, @sukki37
  • Loading branch information
volgariver6 authored Dec 19, 2024
1 parent 5b69b1f commit cfdc3d1
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 1 deletion.
15 changes: 14 additions & 1 deletion pkg/proxy/client_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ type ClientConn interface {
}

type migration struct {
setVarStmts []string
setVarStmtMap map[string]struct{}
setVarStmts []string
}

// clientConn is the connection between proxy and client.
Expand Down Expand Up @@ -231,6 +232,7 @@ func newClientConn(
}
c.tlsConfig = tlsConfig
}
c.migration.setVarStmtMap = make(map[string]struct{})
return c, nil
}

Expand Down Expand Up @@ -408,6 +410,17 @@ func (c *clientConn) handleKill(e *killEvent, resp chan<- []byte) error {
// handleSetVar handles the set variable event.
func (c *clientConn) handleSetVar(e *setVarEvent) error {
defer e.notify()
_, ok := c.migration.setVarStmtMap[e.stmt]
if ok {
for i := 0; i < len(c.migration.setVarStmts); i++ {
if c.migration.setVarStmts[i] == e.stmt {
c.migration.setVarStmts = append(c.migration.setVarStmts[:i], c.migration.setVarStmts[i+1:]...)
i--
}
}
} else {
c.migration.setVarStmtMap[e.stmt] = struct{}{}
}
c.migration.setVarStmts = append(c.migration.setVarStmts, e.stmt)
return nil
}
Expand Down
38 changes: 38 additions & 0 deletions pkg/proxy/client_conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,3 +568,41 @@ func TestClientConn_SendErrToClient(t *testing.T) {
cc.SendErrToClient(moerr.NewInternalErrorNoCtx("msg1"))
wg.Wait()
}

func TestHandleSetVar(t *testing.T) {
defer leaktest.AfterTest(t)()
var cc clientConn
cc.migration.setVarStmtMap = make(map[string]struct{})
e0 := &setVarEvent{
baseEvent: baseEvent{waitC: make(chan struct{}, 5)},
stmt: "set autocommit=0",
}
require.NoError(t, cc.handleSetVar(e0))
require.Equal(t, 1, len(cc.migration.setVarStmtMap))
require.Equal(t, 1, len(cc.migration.setVarStmts))
require.Equal(t, e0.stmt, cc.migration.setVarStmts[len(cc.migration.setVarStmts)-1])

require.NoError(t, cc.handleSetVar(e0))
require.Equal(t, 1, len(cc.migration.setVarStmtMap))
require.Equal(t, 1, len(cc.migration.setVarStmts))
require.Equal(t, e0.stmt, cc.migration.setVarStmts[len(cc.migration.setVarStmts)-1])

e1 := &setVarEvent{
baseEvent: baseEvent{waitC: make(chan struct{}, 5)},
stmt: "set autocommit=1",
}
require.NoError(t, cc.handleSetVar(e1))
require.Equal(t, 2, len(cc.migration.setVarStmtMap))
require.Equal(t, 2, len(cc.migration.setVarStmts))
require.Equal(t, e1.stmt, cc.migration.setVarStmts[len(cc.migration.setVarStmts)-1])

require.NoError(t, cc.handleSetVar(e0))
require.Equal(t, 2, len(cc.migration.setVarStmtMap))
require.Equal(t, 2, len(cc.migration.setVarStmts))
require.Equal(t, e0.stmt, cc.migration.setVarStmts[len(cc.migration.setVarStmts)-1])

require.NoError(t, cc.handleSetVar(e1))
require.Equal(t, 2, len(cc.migration.setVarStmtMap))
require.Equal(t, 2, len(cc.migration.setVarStmts))
require.Equal(t, e1.stmt, cc.migration.setVarStmts[len(cc.migration.setVarStmts)-1])
}

0 comments on commit cfdc3d1

Please sign in to comment.