Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apply getty config #421

Merged
merged 5 commits into from
Jan 2, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,13 @@ func initTmClient(cfg *Config) {

// initRemoting init rpc client
func initRemoting(cfg *Config) {
getty.InitRpcClient()
getty.InitRpcClient(&cfg.GettyConfig, &getty.SeataConfig{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getty.SeataConfig 和 getty.ServiceConfig组合成一个结构体吧

ApplicationID: cfg.ApplicationID,
TxServiceGroup: cfg.TxServiceGroup,
}, &getty.ServiceConfig{
VgroupMapping: cfg.ServiceConfig.VgroupMapping,
Grouplist: cfg.ServiceConfig.Grouplist,
})
}

// InitRmClient init client rm client
Expand Down
3 changes: 1 addition & 2 deletions pkg/client/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,12 @@ import (
"runtime"
"strings"

"github.com/seata/seata-go/pkg/datasource/sql/undo"

"github.com/knadh/koanf"
"github.com/knadh/koanf/parsers/json"
"github.com/knadh/koanf/parsers/toml"
"github.com/knadh/koanf/parsers/yaml"
"github.com/knadh/koanf/providers/rawbytes"
"github.com/seata/seata-go/pkg/datasource/sql/undo"
"github.com/seata/seata-go/pkg/remoting/getty"
"github.com/seata/seata-go/pkg/rm"
"github.com/seata/seata-go/pkg/rm/tcc"
Expand Down
4 changes: 1 addition & 3 deletions pkg/client/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ func TestLoadPath(t *testing.T) {
assert.NotNil(t, cfg.GettyConfig.SessionConfig)
assert.Equal(t, 0, cfg.GettyConfig.ReconnectInterval)
assert.Equal(t, 16, cfg.GettyConfig.ConnectionNum)
assert.Equal(t, time.Second*15, cfg.GettyConfig.HeartbeatPeriod)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个为啥去了呢?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

同上

assert.Equal(t, false, cfg.GettyConfig.SessionConfig.CompressEncoding)
assert.Equal(t, true, cfg.GettyConfig.SessionConfig.TCPNoDelay)
assert.Equal(t, true, cfg.GettyConfig.SessionConfig.TCPKeepAlive)
Expand Down Expand Up @@ -118,7 +117,7 @@ func TestLoadPath(t *testing.T) {
}

func TestLoadJson(t *testing.T) {
confJson := `{"enabled":false,"application-id":"application_test","tx-service-group":"default_tx_group","access-key":"test","secret-key":"test","enable-auto-data-source-proxy":false,"data-source-proxy-mode":"AT","client":{"rm":{"async-commit-buffer-limit":10000,"report-retry-count":5,"table-meta-check-enable":false,"report-success-enable":false,"saga-branch-register-enable":false,"saga-json-parser":"fastjson","saga-retry-persist-mode-update":false,"saga-compensate-persist-mode-update":false,"tcc-action-interceptor-order":-2147482648,"sql-parser-type":"druid","lock":{"retry-interval":10,"retry-times":"30s","retry-policy-branch-rollback-on-conflict":true}},"tm":{"commit-retry-count":5,"rollback-retry-count":5,"default-global-transaction-timeout":"60s","degrade-check":false,"degrade-check-period":2000,"degrade-check-allow-times":"10s","interceptor-order":-2147482648},"undo":{"data-validation":false,"log-serialization":"jackson222","only-care-update-columns":false,"log-table":"undo_log333","compress":{"enable":false,"type":"zip111","threshold":"128k"}}},"tcc":{"fence":{"log-table-name":"tcc_fence_log_test2","clean-period":80000000000}},"getty":{"reconnect-interval":1,"connection-num":10,"heartbeat-period":"10s","session":{"compress-encoding":true,"tcp-no-delay":false,"tcp-keep-alive":false,"keep-alive-period":"120s","tcp-r-buf-size":261120,"tcp-w-buf-size":32768,"tcp-read-timeout":"2s","tcp-write-timeout":"8s","wait-timeout":"2s","max-msg-len":261120,"session-name":"client_test","cron-period":"2s"}},"transport":{"shutdown":{"wait":"3s"},"type":"TCP","server":"NIO","heartbeat":true,"serialization":"seata","compressor":"none"," enable-tm-client-batch-send-request":false,"enable-rm-client-batch-send-request":true,"rpc-rm-request-timeout":"30s","rpc-tm-request-timeout":"30s"},"service":{"enable-degrade":true,"disable-global-transaction":true,"vgroup-mapping":{"default_tx_group":"default_test"},"grouplist":{"default":"127.0.0.1:8092"}}}`
confJson := `{"enabled":false,"application-id":"application_test","tx-service-group":"default_tx_group","access-key":"test","secret-key":"test","enable-auto-data-source-proxy":false,"data-source-proxy-mode":"AT","client":{"rm":{"async-commit-buffer-limit":10000,"report-retry-count":5,"table-meta-check-enable":false,"report-success-enable":false,"saga-branch-register-enable":false,"saga-json-parser":"fastjson","saga-retry-persist-mode-update":false,"saga-compensate-persist-mode-update":false,"tcc-action-interceptor-order":-2147482648,"sql-parser-type":"druid","lock":{"retry-interval":10,"retry-times":"30s","retry-policy-branch-rollback-on-conflict":true}},"tm":{"commit-retry-count":5,"rollback-retry-count":5,"default-global-transaction-timeout":"60s","degrade-check":false,"degrade-check-period":2000,"degrade-check-allow-times":"10s","interceptor-order":-2147482648},"undo":{"data-validation":false,"log-serialization":"jackson222","only-care-update-columns":false,"log-table":"undo_log333","compress":{"enable":false,"type":"zip111","threshold":"128k"}}},"tcc":{"fence":{"log-table-name":"tcc_fence_log_test2","clean-period":80000000000}},"getty":{"reconnect-interval":1,"connection-num":10,"session":{"compress-encoding":true,"tcp-no-delay":false,"tcp-keep-alive":false,"keep-alive-period":"120s","tcp-r-buf-size":261120,"tcp-w-buf-size":32768,"tcp-read-timeout":"2s","tcp-write-timeout":"8s","wait-timeout":"2s","max-msg-len":261120,"session-name":"client_test","cron-period":"2s"}},"transport":{"shutdown":{"wait":"3s"},"type":"TCP","server":"NIO","heartbeat":true,"serialization":"seata","compressor":"none"," enable-tm-client-batch-send-request":false,"enable-rm-client-batch-send-request":true,"rpc-rm-request-timeout":"30s","rpc-tm-request-timeout":"30s"},"service":{"enable-degrade":true,"disable-global-transaction":true,"vgroup-mapping":{"default_tx_group":"default_test"},"grouplist":{"default":"127.0.0.1:8092"}}}`
cfg := LoadJson([]byte(confJson))
assert.NotNil(t, cfg)
assert.Equal(t, false, cfg.Enabled)
Expand Down Expand Up @@ -172,7 +171,6 @@ func TestLoadJson(t *testing.T) {
assert.NotNil(t, cfg.GettyConfig.SessionConfig)
assert.Equal(t, 1, cfg.GettyConfig.ReconnectInterval)
assert.Equal(t, 10, cfg.GettyConfig.ConnectionNum)
assert.Equal(t, time.Second*10, cfg.GettyConfig.HeartbeatPeriod)
assert.Equal(t, true, cfg.GettyConfig.SessionConfig.CompressEncoding)
assert.Equal(t, false, cfg.GettyConfig.SessionConfig.TCPNoDelay)
assert.Equal(t, false, cfg.GettyConfig.SessionConfig.TCPKeepAlive)
Expand Down
2 changes: 2 additions & 0 deletions pkg/constant/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,7 @@ const (
GlobalLockKey = "TX_LOCK"
SeataFilterKey = "seataDubboFilter"

SeataVersion = "1.1.0"

TccBusinessActionContextParameter = "tccParam"
)
26 changes: 24 additions & 2 deletions pkg/remoting/getty/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,17 @@ package getty
import (
"flag"
"time"

"github.com/seata/seata-go/pkg/util/flagext"
)

var (
seataConfig *SeataConfig
)

type Config struct {
ReconnectInterval int `yaml:"reconnect-interval" json:"reconnect-interval" koanf:"reconnect-interval"`
ConnectionNum int `yaml:"connection-num" json:"connection-num" koanf:"connection-num"`
HeartbeatPeriod time.Duration `yaml:"heartbeat-period" json:"heartbeat-period" koanf:"heartbeat-period"`
SessionConfig SessionConfig `yaml:"session" json:"session" koanf:"session"`
}

Expand All @@ -50,7 +55,6 @@ type TransportConfig struct {
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.IntVar(&cfg.ReconnectInterval, prefix+".reconnect-interval", 0, "Reconnect interval.")
f.IntVar(&cfg.ConnectionNum, prefix+".connection-num", 16, "The getty_session pool.")
f.DurationVar(&cfg.HeartbeatPeriod, prefix+".heartbeat-period", 15*time.Second, "The heartbeat period.")
cfg.SessionConfig.RegisterFlagsWithPrefix(prefix+".session", f)
}

Expand All @@ -70,3 +74,21 @@ func (cfg *TransportConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagS
f.DurationVar(&cfg.RPCRmRequestTimeout, prefix+".rpc-rm-request-timeout", 30*time.Second, "RM send request timeout.")
f.DurationVar(&cfg.RPCTmRequestTimeout, prefix+".rpc-tm-request-timeout", 30*time.Second, "TM send request timeout.")
}

type ServiceConfig struct {
VgroupMapping flagext.StringMap
Grouplist flagext.StringMap
}

type SeataConfig struct {
ApplicationID string
TxServiceGroup string
}

func NewGettyConfig(seataConf *SeataConfig) {
seataConfig = seataConf
}

func getSeataConfig() *SeataConfig {
return seataConfig
}
10 changes: 5 additions & 5 deletions pkg/remoting/getty/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"sync"

getty "github.com/apache/dubbo-getty"
"github.com/seata/seata-go/pkg/config"
"github.com/seata/seata-go/pkg/constant"
"github.com/seata/seata-go/pkg/protocol/codec"
"github.com/seata/seata-go/pkg/protocol/message"
"github.com/seata/seata-go/pkg/remoting/processor"
Expand All @@ -36,7 +36,7 @@ var (
)

type gettyClientHandler struct {
conf *config.ClientConfig
conf *SeataConfig
idGenerator *atomic.Uint32
msgFutures *sync.Map
mergeMsgMap *sync.Map
Expand All @@ -48,7 +48,7 @@ func GetGettyClientHandlerInstance() *gettyClientHandler {
if clientHandler == nil {
onceClientHandler.Do(func() {
clientHandler = &gettyClientHandler{
conf: config.GetDefaultClientConfig("seata-go"),
conf: getSeataConfig(),
idGenerator: &atomic.Uint32{},
msgFutures: &sync.Map{},
mergeMsgMap: &sync.Map{},
Expand All @@ -65,9 +65,9 @@ func (g *gettyClientHandler) OnOpen(session getty.Session) error {
g.sessionManager.registerSession(session)
go func() {
request := message.RegisterTMRequest{AbstractIdentifyRequest: message.AbstractIdentifyRequest{
Version: g.conf.SeataVersion,
Version: constant.SeataVersion,
ApplicationId: g.conf.ApplicationID,
TransactionServiceGroup: g.conf.TransactionServiceGroup,
TransactionServiceGroup: g.conf.TxServiceGroup,
}}
err := GetGettyRemotingClient().SendAsyncRequest(request)
if err != nil {
Expand Down
80 changes: 47 additions & 33 deletions pkg/remoting/getty/rpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ import (
"crypto/tls"
"fmt"
"net"
"strings"
"sync"

"github.com/seata/seata-go/pkg/config"
"github.com/seata/seata-go/pkg/protocol/codec"
"github.com/seata/seata-go/pkg/util/log"

Expand All @@ -33,40 +33,57 @@ import (
)

type RpcClient struct {
conf *config.ClientConfig
gettyConf *Config
serviceConf *ServiceConfig
seataConf *SeataConfig
gettyClients []getty.Client
futures *sync.Map
}

func InitRpcClient() {
func InitRpcClient(gettyConfig *Config, seataConfig *SeataConfig, serviceConfig *ServiceConfig) {
NewGettyConfig(seataConfig)
rpcClient := &RpcClient{
conf: config.GetClientConfig(),
gettyConf: gettyConfig,
seataConf: seataConfig,
serviceConf: serviceConfig,
gettyClients: make([]getty.Client, 0),
}
codec.Init()
rpcClient.init()
}

func (c *RpcClient) init() {
addressList := getAvailServerList()
addressList := c.getAvailServerList()
if len(addressList) == 0 {
log.Warn("no have valid seata server list")
}
for _, address := range addressList {
gettyClient := getty.NewTCPClient(
getty.WithServerAddress(address),
getty.WithConnectionNumber(c.conf.GettyConfig.ConnectionNum),
getty.WithReconnectInterval(c.conf.GettyConfig.ReconnectInterval),
getty.WithConnectionNumber(c.gettyConf.ConnectionNum),
getty.WithReconnectInterval(c.gettyConf.ReconnectInterval),
getty.WithClientTaskPool(gxsync.NewTaskPoolSimple(0)),
)
go gettyClient.RunEventLoop(c.newSession)
// c.gettyClients = append(c.gettyClients, gettyClient)
}
}

// todo mock
func getAvailServerList() []string {
return []string{"127.0.0.1:8091"}
func (c *RpcClient) getAvailServerList() []string {
defaultAddressList := []string{"127.0.0.1:8091"}
txServiceGroup := c.seataConf.TxServiceGroup
if txServiceGroup == "" {
return defaultAddressList
}
clusterName := c.serviceConf.VgroupMapping[txServiceGroup]
if clusterName == "" {
return defaultAddressList
}
grouplist := c.serviceConf.Grouplist[clusterName]
if grouplist == "" {
return defaultAddressList
}
return strings.Split(grouplist, ",")
}

func (c *RpcClient) newSession(session getty.Session) error {
Expand All @@ -76,18 +93,11 @@ func (c *RpcClient) newSession(session getty.Session) error {
err error
)

if c.conf.GettyConfig.GettySessionParam.CompressEncoding {
if c.gettyConf.SessionConfig.CompressEncoding {
session.SetCompressType(getty.CompressZip)
}
if _, ok = session.Conn().(*tls.Conn); ok {
session.SetName(c.conf.GettyConfig.GettySessionParam.SessionName)
session.SetMaxMsgLen(c.conf.GettyConfig.GettySessionParam.MaxMsgLen)
session.SetPkgHandler(rpcPkgHandler)
session.SetEventListener(GetGettyClientHandlerInstance())
session.SetReadTimeout(c.conf.GettyConfig.GettySessionParam.TCPReadTimeout)
session.SetWriteTimeout(c.conf.GettyConfig.GettySessionParam.TCPWriteTimeout)
session.SetCronPeriod((int)(c.conf.GettyConfig.GettySessionParam.CronPeriod))
session.SetWaitTime(c.conf.GettyConfig.GettySessionParam.WaitTimeout)
c.setSessionConfig(session)
log.Debugf("server accepts new tls session:%s\n", session.Stat())
return nil
}
Expand All @@ -100,34 +110,38 @@ func (c *RpcClient) newSession(session getty.Session) error {
return errors.New(fmt.Sprintf("%s, session.conn{%#v} is not tcp connection", session.Stat(), session.Conn()))
}

if err = tcpConn.SetNoDelay(c.conf.GettyConfig.GettySessionParam.TCPNoDelay); err != nil {
if err = tcpConn.SetNoDelay(c.gettyConf.SessionConfig.TCPNoDelay); err != nil {
return err
}
if err = tcpConn.SetKeepAlive(c.conf.GettyConfig.GettySessionParam.TCPKeepAlive); err != nil {
if err = tcpConn.SetKeepAlive(c.gettyConf.SessionConfig.TCPKeepAlive); err != nil {
return err
}
if c.conf.GettyConfig.GettySessionParam.TCPKeepAlive {
if err = tcpConn.SetKeepAlivePeriod(c.conf.GettyConfig.GettySessionParam.KeepAlivePeriod); err != nil {
if c.gettyConf.SessionConfig.TCPKeepAlive {
if err = tcpConn.SetKeepAlivePeriod(c.gettyConf.SessionConfig.KeepAlivePeriod); err != nil {
return err
}
}
if err = tcpConn.SetReadBuffer(c.conf.GettyConfig.GettySessionParam.TCPRBufSize); err != nil {
if err = tcpConn.SetReadBuffer(c.gettyConf.SessionConfig.TCPRBufSize); err != nil {
return err
}
if err = tcpConn.SetWriteBuffer(c.conf.GettyConfig.GettySessionParam.TCPWBufSize); err != nil {
if err = tcpConn.SetWriteBuffer(c.gettyConf.SessionConfig.TCPWBufSize); err != nil {
return err
}
}

session.SetName(c.conf.GettyConfig.GettySessionParam.SessionName)
session.SetMaxMsgLen(c.conf.GettyConfig.GettySessionParam.MaxMsgLen)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里为啥去了呢?

session.SetPkgHandler(rpcPkgHandler)
session.SetEventListener(GetGettyClientHandlerInstance())
session.SetReadTimeout(c.conf.GettyConfig.GettySessionParam.TCPReadTimeout)
session.SetWriteTimeout(c.conf.GettyConfig.GettySessionParam.TCPWriteTimeout)
session.SetCronPeriod((int)(c.conf.GettyConfig.GettySessionParam.CronPeriod.Nanoseconds() / 1e6))
session.SetWaitTime(c.conf.GettyConfig.GettySessionParam.WaitTimeout)
c.setSessionConfig(session)
log.Debugf("rpc_client new session:%s\n", session.Stat())

return nil
}

func (c *RpcClient) setSessionConfig(session getty.Session) {
session.SetName(c.gettyConf.SessionConfig.SessionName)
session.SetMaxMsgLen(c.gettyConf.SessionConfig.MaxMsgLen)
session.SetPkgHandler(rpcPkgHandler)
session.SetEventListener(GetGettyClientHandlerInstance())
session.SetReadTimeout(c.gettyConf.SessionConfig.TCPReadTimeout)
session.SetWriteTimeout(c.gettyConf.SessionConfig.TCPWriteTimeout)
session.SetCronPeriod((int)(c.gettyConf.SessionConfig.CronPeriod.Milliseconds()))
session.SetWaitTime(c.gettyConf.SessionConfig.WaitTimeout)
}
1 change: 0 additions & 1 deletion testdata/conf/seatago.yml
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,6 @@ seata:
getty:
reconnect-interval: 0
connection-num: 16
heartbeat-period: 15s
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个为啥去了呢?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

因为这个配置和session里面的cron-period重复了

session:
compress-encoding: false
tcp-no-delay: true
Expand Down