Skip to content

Commit

Permalink
Apply getty config (apache#421)
Browse files Browse the repository at this point in the history
* apply getty config
  • Loading branch information
lxfeng1997 authored and georgehao committed Feb 3, 2023
1 parent 7100886 commit bff8446
Show file tree
Hide file tree
Showing 8 changed files with 81 additions and 47 deletions.
7 changes: 6 additions & 1 deletion pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,12 @@ func initTmClient(cfg *Config) {

// initRemoting init rpc client
func initRemoting(cfg *Config) {
getty.InitRpcClient()
getty.InitRpcClient(&cfg.GettyConfig, &getty.SeataConfig{
ApplicationID: cfg.ApplicationID,
TxServiceGroup: cfg.TxServiceGroup,
ServiceVgroupMapping: cfg.ServiceConfig.VgroupMapping,
ServiceGrouplist: cfg.ServiceConfig.Grouplist,
})
}

// InitRmClient init client rm client
Expand Down
3 changes: 1 addition & 2 deletions pkg/client/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,12 @@ import (
"runtime"
"strings"

"github.com/seata/seata-go/pkg/datasource/sql/undo"

"github.com/knadh/koanf"
"github.com/knadh/koanf/parsers/json"
"github.com/knadh/koanf/parsers/toml"
"github.com/knadh/koanf/parsers/yaml"
"github.com/knadh/koanf/providers/rawbytes"
"github.com/seata/seata-go/pkg/datasource/sql/undo"
"github.com/seata/seata-go/pkg/remoting/getty"
"github.com/seata/seata-go/pkg/rm"
"github.com/seata/seata-go/pkg/rm/tcc"
Expand Down
4 changes: 1 addition & 3 deletions pkg/client/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ func TestLoadPath(t *testing.T) {
assert.NotNil(t, cfg.GettyConfig.SessionConfig)
assert.Equal(t, 0, cfg.GettyConfig.ReconnectInterval)
assert.Equal(t, 16, cfg.GettyConfig.ConnectionNum)
assert.Equal(t, time.Second*15, cfg.GettyConfig.HeartbeatPeriod)
assert.Equal(t, false, cfg.GettyConfig.SessionConfig.CompressEncoding)
assert.Equal(t, true, cfg.GettyConfig.SessionConfig.TCPNoDelay)
assert.Equal(t, true, cfg.GettyConfig.SessionConfig.TCPKeepAlive)
Expand Down Expand Up @@ -118,7 +117,7 @@ func TestLoadPath(t *testing.T) {
}

func TestLoadJson(t *testing.T) {
confJson := `{"enabled":false,"application-id":"application_test","tx-service-group":"default_tx_group","access-key":"test","secret-key":"test","enable-auto-data-source-proxy":false,"data-source-proxy-mode":"AT","client":{"rm":{"async-commit-buffer-limit":10000,"report-retry-count":5,"table-meta-check-enable":false,"report-success-enable":false,"saga-branch-register-enable":false,"saga-json-parser":"fastjson","saga-retry-persist-mode-update":false,"saga-compensate-persist-mode-update":false,"tcc-action-interceptor-order":-2147482648,"sql-parser-type":"druid","lock":{"retry-interval":10,"retry-times":"30s","retry-policy-branch-rollback-on-conflict":true}},"tm":{"commit-retry-count":5,"rollback-retry-count":5,"default-global-transaction-timeout":"60s","degrade-check":false,"degrade-check-period":2000,"degrade-check-allow-times":"10s","interceptor-order":-2147482648},"undo":{"data-validation":false,"log-serialization":"jackson222","only-care-update-columns":false,"log-table":"undo_log333","compress":{"enable":false,"type":"zip111","threshold":"128k"}}},"tcc":{"fence":{"log-table-name":"tcc_fence_log_test2","clean-period":80000000000}},"getty":{"reconnect-interval":1,"connection-num":10,"heartbeat-period":"10s","session":{"compress-encoding":true,"tcp-no-delay":false,"tcp-keep-alive":false,"keep-alive-period":"120s","tcp-r-buf-size":261120,"tcp-w-buf-size":32768,"tcp-read-timeout":"2s","tcp-write-timeout":"8s","wait-timeout":"2s","max-msg-len":261120,"session-name":"client_test","cron-period":"2s"}},"transport":{"shutdown":{"wait":"3s"},"type":"TCP","server":"NIO","heartbeat":true,"serialization":"seata","compressor":"none"," enable-tm-client-batch-send-request":false,"enable-rm-client-batch-send-request":true,"rpc-rm-request-timeout":"30s","rpc-tm-request-timeout":"30s"},"service":{"enable-degrade":true,"disable-global-transaction":true,"vgroup-mapping":{"default_tx_group":"default_test"},"grouplist":{"default":"127.0.0.1:8092"}}}`
confJson := `{"enabled":false,"application-id":"application_test","tx-service-group":"default_tx_group","access-key":"test","secret-key":"test","enable-auto-data-source-proxy":false,"data-source-proxy-mode":"AT","client":{"rm":{"async-commit-buffer-limit":10000,"report-retry-count":5,"table-meta-check-enable":false,"report-success-enable":false,"saga-branch-register-enable":false,"saga-json-parser":"fastjson","saga-retry-persist-mode-update":false,"saga-compensate-persist-mode-update":false,"tcc-action-interceptor-order":-2147482648,"sql-parser-type":"druid","lock":{"retry-interval":10,"retry-times":"30s","retry-policy-branch-rollback-on-conflict":true}},"tm":{"commit-retry-count":5,"rollback-retry-count":5,"default-global-transaction-timeout":"60s","degrade-check":false,"degrade-check-period":2000,"degrade-check-allow-times":"10s","interceptor-order":-2147482648},"undo":{"data-validation":false,"log-serialization":"jackson222","only-care-update-columns":false,"log-table":"undo_log333","compress":{"enable":false,"type":"zip111","threshold":"128k"}}},"tcc":{"fence":{"log-table-name":"tcc_fence_log_test2","clean-period":80000000000}},"getty":{"reconnect-interval":1,"connection-num":10,"session":{"compress-encoding":true,"tcp-no-delay":false,"tcp-keep-alive":false,"keep-alive-period":"120s","tcp-r-buf-size":261120,"tcp-w-buf-size":32768,"tcp-read-timeout":"2s","tcp-write-timeout":"8s","wait-timeout":"2s","max-msg-len":261120,"session-name":"client_test","cron-period":"2s"}},"transport":{"shutdown":{"wait":"3s"},"type":"TCP","server":"NIO","heartbeat":true,"serialization":"seata","compressor":"none"," enable-tm-client-batch-send-request":false,"enable-rm-client-batch-send-request":true,"rpc-rm-request-timeout":"30s","rpc-tm-request-timeout":"30s"},"service":{"enable-degrade":true,"disable-global-transaction":true,"vgroup-mapping":{"default_tx_group":"default_test"},"grouplist":{"default":"127.0.0.1:8092"}}}`
cfg := LoadJson([]byte(confJson))
assert.NotNil(t, cfg)
assert.Equal(t, false, cfg.Enabled)
Expand Down Expand Up @@ -172,7 +171,6 @@ func TestLoadJson(t *testing.T) {
assert.NotNil(t, cfg.GettyConfig.SessionConfig)
assert.Equal(t, 1, cfg.GettyConfig.ReconnectInterval)
assert.Equal(t, 10, cfg.GettyConfig.ConnectionNum)
assert.Equal(t, time.Second*10, cfg.GettyConfig.HeartbeatPeriod)
assert.Equal(t, true, cfg.GettyConfig.SessionConfig.CompressEncoding)
assert.Equal(t, false, cfg.GettyConfig.SessionConfig.TCPNoDelay)
assert.Equal(t, false, cfg.GettyConfig.SessionConfig.TCPKeepAlive)
Expand Down
2 changes: 2 additions & 0 deletions pkg/constant/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,7 @@ const (
GlobalLockKey = "TX_LOCK"
SeataFilterKey = "seataDubboFilter"

SeataVersion = "1.1.0"

TccBusinessActionContextParameter = "tccParam"
)
23 changes: 21 additions & 2 deletions pkg/remoting/getty/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,17 @@ package getty
import (
"flag"
"time"

"github.com/seata/seata-go/pkg/util/flagext"
)

var (
seataConfig *SeataConfig
)

type Config struct {
ReconnectInterval int `yaml:"reconnect-interval" json:"reconnect-interval" koanf:"reconnect-interval"`
ConnectionNum int `yaml:"connection-num" json:"connection-num" koanf:"connection-num"`
HeartbeatPeriod time.Duration `yaml:"heartbeat-period" json:"heartbeat-period" koanf:"heartbeat-period"`
SessionConfig SessionConfig `yaml:"session" json:"session" koanf:"session"`
}

Expand All @@ -50,7 +55,6 @@ type TransportConfig struct {
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.IntVar(&cfg.ReconnectInterval, prefix+".reconnect-interval", 0, "Reconnect interval.")
f.IntVar(&cfg.ConnectionNum, prefix+".connection-num", 16, "The getty_session pool.")
f.DurationVar(&cfg.HeartbeatPeriod, prefix+".heartbeat-period", 15*time.Second, "The heartbeat period.")
cfg.SessionConfig.RegisterFlagsWithPrefix(prefix+".session", f)
}

Expand All @@ -70,3 +74,18 @@ func (cfg *TransportConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagS
f.DurationVar(&cfg.RPCRmRequestTimeout, prefix+".rpc-rm-request-timeout", 30*time.Second, "RM send request timeout.")
f.DurationVar(&cfg.RPCTmRequestTimeout, prefix+".rpc-tm-request-timeout", 30*time.Second, "TM send request timeout.")
}

type SeataConfig struct {
ApplicationID string
TxServiceGroup string
ServiceVgroupMapping flagext.StringMap
ServiceGrouplist flagext.StringMap
}

func iniConfig(seataConf *SeataConfig) {
seataConfig = seataConf
}

func getSeataConfig() *SeataConfig {
return seataConfig
}
10 changes: 5 additions & 5 deletions pkg/remoting/getty/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"sync"

getty "github.com/apache/dubbo-getty"
"github.com/seata/seata-go/pkg/config"
"github.com/seata/seata-go/pkg/constant"
"github.com/seata/seata-go/pkg/protocol/codec"
"github.com/seata/seata-go/pkg/protocol/message"
"github.com/seata/seata-go/pkg/remoting/processor"
Expand All @@ -36,7 +36,7 @@ var (
)

type gettyClientHandler struct {
conf *config.ClientConfig
conf *SeataConfig
idGenerator *atomic.Uint32
msgFutures *sync.Map
mergeMsgMap *sync.Map
Expand All @@ -48,7 +48,7 @@ func GetGettyClientHandlerInstance() *gettyClientHandler {
if clientHandler == nil {
onceClientHandler.Do(func() {
clientHandler = &gettyClientHandler{
conf: config.GetDefaultClientConfig("seata-go"),
conf: getSeataConfig(),
idGenerator: &atomic.Uint32{},
msgFutures: &sync.Map{},
mergeMsgMap: &sync.Map{},
Expand All @@ -65,9 +65,9 @@ func (g *gettyClientHandler) OnOpen(session getty.Session) error {
g.sessionManager.registerSession(session)
go func() {
request := message.RegisterTMRequest{AbstractIdentifyRequest: message.AbstractIdentifyRequest{
Version: g.conf.SeataVersion,
Version: constant.SeataVersion,
ApplicationId: g.conf.ApplicationID,
TransactionServiceGroup: g.conf.TransactionServiceGroup,
TransactionServiceGroup: g.conf.TxServiceGroup,
}}
err := GetGettyRemotingClient().SendAsyncRequest(request)
if err != nil {
Expand Down
78 changes: 45 additions & 33 deletions pkg/remoting/getty/rpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ import (
"crypto/tls"
"fmt"
"net"
"strings"
"sync"

"github.com/seata/seata-go/pkg/config"
"github.com/seata/seata-go/pkg/protocol/codec"
"github.com/seata/seata-go/pkg/util/log"

Expand All @@ -33,40 +33,55 @@ import (
)

type RpcClient struct {
conf *config.ClientConfig
gettyConf *Config
seataConf *SeataConfig
gettyClients []getty.Client
futures *sync.Map
}

func InitRpcClient() {
func InitRpcClient(gettyConfig *Config, seataConfig *SeataConfig) {
iniConfig(seataConfig)
rpcClient := &RpcClient{
conf: config.GetClientConfig(),
gettyConf: gettyConfig,
seataConf: seataConfig,
gettyClients: make([]getty.Client, 0),
}
codec.Init()
rpcClient.init()
}

func (c *RpcClient) init() {
addressList := getAvailServerList()
addressList := c.getAvailServerList()
if len(addressList) == 0 {
log.Warn("no have valid seata server list")
}
for _, address := range addressList {
gettyClient := getty.NewTCPClient(
getty.WithServerAddress(address),
getty.WithConnectionNumber(c.conf.GettyConfig.ConnectionNum),
getty.WithReconnectInterval(c.conf.GettyConfig.ReconnectInterval),
getty.WithConnectionNumber(c.gettyConf.ConnectionNum),
getty.WithReconnectInterval(c.gettyConf.ReconnectInterval),
getty.WithClientTaskPool(gxsync.NewTaskPoolSimple(0)),
)
go gettyClient.RunEventLoop(c.newSession)
// c.gettyClients = append(c.gettyClients, gettyClient)
}
}

// todo mock
func getAvailServerList() []string {
return []string{"127.0.0.1:8091"}
func (c *RpcClient) getAvailServerList() []string {
defaultAddressList := []string{"127.0.0.1:8091"}
txServiceGroup := c.seataConf.TxServiceGroup
if txServiceGroup == "" {
return defaultAddressList
}
clusterName := c.seataConf.ServiceVgroupMapping[txServiceGroup]
if clusterName == "" {
return defaultAddressList
}
grouplist := c.seataConf.ServiceGrouplist[clusterName]
if grouplist == "" {
return defaultAddressList
}
return strings.Split(grouplist, ",")
}

func (c *RpcClient) newSession(session getty.Session) error {
Expand All @@ -76,18 +91,11 @@ func (c *RpcClient) newSession(session getty.Session) error {
err error
)

if c.conf.GettyConfig.GettySessionParam.CompressEncoding {
if c.gettyConf.SessionConfig.CompressEncoding {
session.SetCompressType(getty.CompressZip)
}
if _, ok = session.Conn().(*tls.Conn); ok {
session.SetName(c.conf.GettyConfig.GettySessionParam.SessionName)
session.SetMaxMsgLen(c.conf.GettyConfig.GettySessionParam.MaxMsgLen)
session.SetPkgHandler(rpcPkgHandler)
session.SetEventListener(GetGettyClientHandlerInstance())
session.SetReadTimeout(c.conf.GettyConfig.GettySessionParam.TCPReadTimeout)
session.SetWriteTimeout(c.conf.GettyConfig.GettySessionParam.TCPWriteTimeout)
session.SetCronPeriod((int)(c.conf.GettyConfig.GettySessionParam.CronPeriod))
session.SetWaitTime(c.conf.GettyConfig.GettySessionParam.WaitTimeout)
c.setSessionConfig(session)
log.Debugf("server accepts new tls session:%s\n", session.Stat())
return nil
}
Expand All @@ -100,34 +108,38 @@ func (c *RpcClient) newSession(session getty.Session) error {
return errors.New(fmt.Sprintf("%s, session.conn{%#v} is not tcp connection", session.Stat(), session.Conn()))
}

if err = tcpConn.SetNoDelay(c.conf.GettyConfig.GettySessionParam.TCPNoDelay); err != nil {
if err = tcpConn.SetNoDelay(c.gettyConf.SessionConfig.TCPNoDelay); err != nil {
return err
}
if err = tcpConn.SetKeepAlive(c.conf.GettyConfig.GettySessionParam.TCPKeepAlive); err != nil {
if err = tcpConn.SetKeepAlive(c.gettyConf.SessionConfig.TCPKeepAlive); err != nil {
return err
}
if c.conf.GettyConfig.GettySessionParam.TCPKeepAlive {
if err = tcpConn.SetKeepAlivePeriod(c.conf.GettyConfig.GettySessionParam.KeepAlivePeriod); err != nil {
if c.gettyConf.SessionConfig.TCPKeepAlive {
if err = tcpConn.SetKeepAlivePeriod(c.gettyConf.SessionConfig.KeepAlivePeriod); err != nil {
return err
}
}
if err = tcpConn.SetReadBuffer(c.conf.GettyConfig.GettySessionParam.TCPRBufSize); err != nil {
if err = tcpConn.SetReadBuffer(c.gettyConf.SessionConfig.TCPRBufSize); err != nil {
return err
}
if err = tcpConn.SetWriteBuffer(c.conf.GettyConfig.GettySessionParam.TCPWBufSize); err != nil {
if err = tcpConn.SetWriteBuffer(c.gettyConf.SessionConfig.TCPWBufSize); err != nil {
return err
}
}

session.SetName(c.conf.GettyConfig.GettySessionParam.SessionName)
session.SetMaxMsgLen(c.conf.GettyConfig.GettySessionParam.MaxMsgLen)
session.SetPkgHandler(rpcPkgHandler)
session.SetEventListener(GetGettyClientHandlerInstance())
session.SetReadTimeout(c.conf.GettyConfig.GettySessionParam.TCPReadTimeout)
session.SetWriteTimeout(c.conf.GettyConfig.GettySessionParam.TCPWriteTimeout)
session.SetCronPeriod((int)(c.conf.GettyConfig.GettySessionParam.CronPeriod.Nanoseconds() / 1e6))
session.SetWaitTime(c.conf.GettyConfig.GettySessionParam.WaitTimeout)
c.setSessionConfig(session)
log.Debugf("rpc_client new session:%s\n", session.Stat())

return nil
}

func (c *RpcClient) setSessionConfig(session getty.Session) {
session.SetName(c.gettyConf.SessionConfig.SessionName)
session.SetMaxMsgLen(c.gettyConf.SessionConfig.MaxMsgLen)
session.SetPkgHandler(rpcPkgHandler)
session.SetEventListener(GetGettyClientHandlerInstance())
session.SetReadTimeout(c.gettyConf.SessionConfig.TCPReadTimeout)
session.SetWriteTimeout(c.gettyConf.SessionConfig.TCPWriteTimeout)
session.SetCronPeriod((int)(c.gettyConf.SessionConfig.CronPeriod.Milliseconds()))
session.SetWaitTime(c.gettyConf.SessionConfig.WaitTimeout)
}
1 change: 0 additions & 1 deletion testdata/conf/seatago.yml
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,6 @@ seata:
getty:
reconnect-interval: 0
connection-num: 16
heartbeat-period: 15s
session:
compress-encoding: false
tcp-no-delay: true
Expand Down

0 comments on commit bff8446

Please sign in to comment.