Skip to content

Commit

Permalink
chore: release v0.7.2 (#1127)
Browse files Browse the repository at this point in the history
Co-authored-by: QihengZhou <[email protected]>
Co-authored-by: Li2CO3 <[email protected]>
Co-authored-by: Felix021 <[email protected]>
Co-authored-by: Joway <[email protected]>
Co-authored-by: qiheng.zhou <[email protected]>
Co-authored-by: kinggo <[email protected]>
Co-authored-by: Jayant <[email protected]>
Co-authored-by: Z.Q.K <[email protected]>
  • Loading branch information
9 people authored Sep 27, 2023
1 parent 07015dc commit 8bdd3ad
Show file tree
Hide file tree
Showing 51 changed files with 958 additions and 300 deletions.
28 changes: 28 additions & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,31 @@ jobs:
go-version: ${{ matrix.go }}
- name: Unit Test
run: go test -gcflags=-l -race -covermode=atomic ./...

codegen-test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Go
uses: actions/setup-go@v3
with:
go-version: '1.17'
- name: Prepare
run: |
go install github.com/cloudwego/thriftgo@main
go install ./tool/cmd/kitex
LOCAL_REPO=$(pwd)
cd ..
git clone https://github.com/cloudwego/kitex-tests.git
cd kitex-tests/codegen
go mod init codegen-test
go mod edit -replace=github.com/apache/thrift=github.com/apache/[email protected]
go mod edit -replace github.com/cloudwego/kitex=${LOCAL_REPO}
go mod tidy
bash -version
bash ./codegen_install_check.sh
- name: CodeGen
run: |
cd ../kitex-tests/codegen
tree
bash ./codegen_run.sh
28 changes: 18 additions & 10 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,8 +302,8 @@ func richMWsWithBuilder(ctx context.Context, mwBs []endpoint.MiddlewareBuilder)
}

// initRPCInfo initializes the RPCInfo structure and attaches it to context.
func (kc *kClient) initRPCInfo(ctx context.Context, method string, retryTimes int) (context.Context, rpcinfo.RPCInfo, *callopt.CallOptions) {
return initRPCInfo(ctx, method, kc.opt, kc.svcInfo, retryTimes)
func (kc *kClient) initRPCInfo(ctx context.Context, method string, retryTimes int, firstRI rpcinfo.RPCInfo) (context.Context, rpcinfo.RPCInfo, *callopt.CallOptions) {
return initRPCInfo(ctx, method, kc.opt, kc.svcInfo, retryTimes, firstRI)
}

func applyCallOptions(ctx context.Context, cfg rpcinfo.MutableRPCConfig, svr remoteinfo.RemoteInfo, opt *client.Options) (context.Context, *callopt.CallOptions) {
Expand All @@ -325,10 +325,7 @@ func (kc *kClient) Call(ctx context.Context, method string, request, response in
validateForCall(ctx, kc.inited, kc.closed)
var ri rpcinfo.RPCInfo
var callOpts *callopt.CallOptions
ctx, ri, callOpts = kc.initRPCInfo(ctx, method, 0)
if callOpts != nil && callOpts.CompressorName != "" {
ctx = remote.SetSendCompressor(ctx, callOpts.CompressorName)
}
ctx, ri, callOpts = kc.initRPCInfo(ctx, method, 0, nil)

ctx = kc.opt.TracerCtl.DoStart(ctx, ri)
var reportErr error
Expand Down Expand Up @@ -364,7 +361,7 @@ func (kc *kClient) Call(ctx context.Context, method string, request, response in
recycleRI = true
}
} else {
recycleRI, err = kc.opt.RetryContainer.WithRetryIfNeeded(ctx, callOptRetry, kc.rpcCallWithRetry(ri, method, request, response), ri, request)
ri, recycleRI, err = kc.opt.RetryContainer.WithRetryIfNeeded(ctx, callOptRetry, kc.rpcCallWithRetry(ri, method, request, response), ri, request)
}

// do fallback if with setup
Expand All @@ -381,7 +378,7 @@ func (kc *kClient) rpcCallWithRetry(ri rpcinfo.RPCInfo, method string, request,
currCallTimes := int(atomic.AddInt32(&callTimes, 1))
cRI := ri
if currCallTimes > 1 {
ctx, cRI, _ = kc.initRPCInfo(ctx, method, currCallTimes-1)
ctx, cRI, _ = kc.initRPCInfo(ctx, method, currCallTimes-1, ri)
ctx = metainfo.WithPersistentValue(ctx, retry.TransitKey, strconv.Itoa(currCallTimes-1))
if prevRI.Load() == nil {
prevRI.Store(ri)
Expand Down Expand Up @@ -671,12 +668,17 @@ func getFallbackPolicy(cliOptFB *fallback.Policy, callOpts *callopt.CallOptions)
return nil, false
}

func initRPCInfo(ctx context.Context, method string, opt *client.Options, svcInfo *serviceinfo.ServiceInfo, retryTimes int) (context.Context, rpcinfo.RPCInfo, *callopt.CallOptions) {
func initRPCInfo(ctx context.Context, method string, opt *client.Options, svcInfo *serviceinfo.ServiceInfo, retryTimes int, firstRI rpcinfo.RPCInfo) (context.Context, rpcinfo.RPCInfo, *callopt.CallOptions) {
cfg := rpcinfo.AsMutableRPCConfig(opt.Configs).Clone()
rmt := remoteinfo.NewRemoteInfo(opt.Svr, method)
var callOpts *callopt.CallOptions
ctx, callOpts = applyCallOptions(ctx, cfg, rmt, opt)
rpcStats := rpcinfo.AsMutableRPCStats(rpcinfo.NewRPCStats())
var rpcStats rpcinfo.MutableRPCStats
if firstRI != nil {
rpcStats = rpcinfo.AsMutableRPCStats(firstRI.Stats().CopyForRetry())
} else {
rpcStats = rpcinfo.AsMutableRPCStats(rpcinfo.NewRPCStats())
}
if opt.StatsLevel != nil {
rpcStats.SetLevel(*opt.StatsLevel)
}
Expand Down Expand Up @@ -712,5 +714,11 @@ func initRPCInfo(ctx context.Context, method string, opt *client.Options, svcInf
}

ctx = rpcinfo.NewCtxWithRPCInfo(ctx, ri)

if callOpts != nil && callOpts.CompressorName != "" {
// set send grpc compressor at client to tell how to server decode
remote.SetSendCompressor(ri, callOpts.CompressorName)
}

return ctx, ri, callOpts
}
2 changes: 1 addition & 1 deletion client/option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,7 @@ func TestWithLongConnectionOption(t *testing.T) {
opts := client.NewOptions(options)
test.Assert(t, opts.PoolCfg.MaxIdleTimeout == 30*time.Second) // defaultMaxIdleTimeout
test.Assert(t, opts.PoolCfg.MaxIdlePerAddress == 1) // default
test.Assert(t, opts.PoolCfg.MaxIdleGlobal == 1) // default
test.Assert(t, opts.PoolCfg.MaxIdleGlobal == 1<<20) // default
}

func TestWithWarmingUpOption(t *testing.T) {
Expand Down
4 changes: 3 additions & 1 deletion client/service_inline.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/cloudwego/kitex/client/callopt"
"github.com/cloudwego/kitex/internal/client"
internal_server "github.com/cloudwego/kitex/internal/server"
"github.com/cloudwego/kitex/pkg/consts"
"github.com/cloudwego/kitex/pkg/endpoint"
"github.com/cloudwego/kitex/pkg/klog"
"github.com/cloudwego/kitex/pkg/rpcinfo"
Expand Down Expand Up @@ -125,7 +126,7 @@ func (kc *serviceInlineClient) initMiddlewares(ctx context.Context) {

// initRPCInfo initializes the RPCInfo structure and attaches it to context.
func (kc *serviceInlineClient) initRPCInfo(ctx context.Context, method string) (context.Context, rpcinfo.RPCInfo, *callopt.CallOptions) {
return initRPCInfo(ctx, method, kc.opt, kc.svcInfo, 0)
return initRPCInfo(ctx, method, kc.opt, kc.svcInfo, 0, nil)
}

// Call implements the Client interface .
Expand Down Expand Up @@ -207,6 +208,7 @@ func (kc *serviceInlineClient) constructServerRPCInfo(svrCtx, cliCtx context.Con
ink.SetServiceName(kc.svcInfo.ServiceName)
}
rpcinfo.AsMutableEndpointInfo(ri.To()).SetMethod(method)
svrCtx = context.WithValue(svrCtx, consts.CtxKeyMethod, method)
return svrCtx, ri
}

Expand Down
35 changes: 33 additions & 2 deletions client/service_inline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,22 @@ import (
"github.com/cloudwego/kitex/internal/mocks"
internal_server "github.com/cloudwego/kitex/internal/server"
"github.com/cloudwego/kitex/internal/test"
"github.com/cloudwego/kitex/pkg/consts"
"github.com/cloudwego/kitex/pkg/endpoint"
"github.com/cloudwego/kitex/pkg/kerrors"
"github.com/cloudwego/kitex/pkg/rpcinfo"
"github.com/cloudwego/kitex/pkg/rpcinfo/remoteinfo"
"github.com/cloudwego/kitex/pkg/serviceinfo"
)

type serverInitialInfoImpl struct{}
type serverInitialInfoImpl struct {
EndpointsFunc func(ctx context.Context, req, resp interface{}) (err error)
}

func (s serverInitialInfoImpl) Endpoints() endpoint.Endpoint {
if s.EndpointsFunc != nil {
return s.EndpointsFunc
}
return func(ctx context.Context, req, resp interface{}) (err error) {
return nil
}
Expand All @@ -67,8 +73,8 @@ func newMockServiceInlineClient(tb testing.TB, ctrl *gomock.Controller, extra ..
WithDestService("destService"),
}
opts = append(opts, extra...)

svcInfo := mocks.ServiceInfo()

cli, err := NewServiceInlineClient(svcInfo, newMockServerInitialInfo(), opts...)
test.Assert(tb, err == nil)

Expand Down Expand Up @@ -341,3 +347,28 @@ func TestServiceInlineClientFinalizer(t *testing.T) {
t.Logf("After second GC, allocation: %f Mb, Number of allocation: %d\n", secondGCHeapAlloc, secondGCHeapObjects)
test.Assert(t, secondGCHeapAlloc < firstGCHeapAlloc/2 && secondGCHeapObjects < firstGCHeapObjects/2)
}

func TestServiceInlineMethodKeyCall(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mtd := mocks.MockMethod
opts := []Option{
WithTransHandlerFactory(newMockCliTransHandlerFactory(ctrl)),
WithResolver(resolver404(ctrl)),
WithDialer(newDialer(ctrl)),
WithDestService("destService"),
}
svcInfo := mocks.ServiceInfo()
s := serverInitialInfoImpl{}
s.EndpointsFunc = func(ctx context.Context, req, resp interface{}) (err error) {
test.Assert(t, ctx.Value(consts.CtxKeyMethod) == mtd)
return nil
}
cli, err := NewServiceInlineClient(svcInfo, s, opts...)
test.Assert(t, err == nil)
ctx := context.Background()
req := new(MockTStruct)
res := new(MockTStruct)
err = cli.Call(ctx, mtd, req, res)
test.Assert(t, err == nil, err)
}
2 changes: 1 addition & 1 deletion client/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (kc *kClient) Stream(ctx context.Context, method string, request, response
panic("ctx is nil")
}
var ri rpcinfo.RPCInfo
ctx, ri, _ = kc.initRPCInfo(ctx, method, 0)
ctx, ri, _ = kc.initRPCInfo(ctx, method, 0, nil)

rpcinfo.AsMutableRPCConfig(ri.Config()).SetInteractionMode(rpcinfo.Streaming)
ctx = rpcinfo.NewCtxWithRPCInfo(ctx, ri)
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ require (
github.com/cloudwego/configmanager v0.2.0
github.com/cloudwego/dynamicgo v0.1.3
github.com/cloudwego/fastpb v0.0.4
github.com/cloudwego/frugal v0.1.7
github.com/cloudwego/frugal v0.1.8
github.com/cloudwego/localsession v0.0.2
github.com/cloudwego/netpoll v0.4.1
github.com/cloudwego/netpoll v0.5.0
github.com/cloudwego/thriftgo v0.3.0
github.com/golang/mock v1.6.0
github.com/google/pprof v0.0.0-20220608213341-c488b8fa1db3
Expand Down
12 changes: 8 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ github.com/cloudwego/fastpb v0.0.4 h1:/ROVVfoFtpfc+1pkQLzGs+azjxUbSOsAqSY4tAAx4m
github.com/cloudwego/fastpb v0.0.4/go.mod h1:/V13XFTq2TUkxj2qWReV8MwfPC4NnPcy6FsrojnsSG0=
github.com/cloudwego/frugal v0.1.3/go.mod h1:b981ViPYdhI56aFYsoMjl9kv6yeqYSO+iEz2jrhkCgI=
github.com/cloudwego/frugal v0.1.6/go.mod h1:9ElktKsh5qd2zDBQ5ENhPSQV7F2dZ/mXlr1eaZGDBFs=
github.com/cloudwego/frugal v0.1.7 h1:Ggyk8mk0WrhBlM4g4RJxdOcVWJl/Hxbd8NJ19J8My6c=
github.com/cloudwego/frugal v0.1.7/go.mod h1:3VECBCSiTYwm3QApqHXjZB9NDH+8hUw7txxlr+6pPb4=
github.com/cloudwego/frugal v0.1.8 h1:MaJDRfvSnepsbUyMlQA9cySJ2+Y/we+r57tv5txx3sE=
github.com/cloudwego/frugal v0.1.8/go.mod h1:F0mLIWHymuQgh6r8N0owTA/ARv1B4SOiKa88tpOAfEU=
github.com/cloudwego/kitex v0.3.2/go.mod h1:/XD07VpUD9VQWmmoepASgZ6iw//vgWikVA9MpzLC5i0=
github.com/cloudwego/kitex v0.4.4/go.mod h1:3FcH5h9Qw+dhRljSzuGSpWuThttA8DvK0BsL7HUYydo=
github.com/cloudwego/kitex v0.6.1/go.mod h1:zI1GBrjT0qloTikcCfQTgxg3Ws+yQMyaChEEOcGNUvA=
Expand All @@ -62,8 +62,12 @@ github.com/cloudwego/localsession v0.0.2/go.mod h1:kiJxmvAcy4PLgKtEnPS5AXed3xCiX
github.com/cloudwego/netpoll v0.2.4/go.mod h1:1T2WVuQ+MQw6h6DpE45MohSvDTKdy2DlzCx2KsnPI4E=
github.com/cloudwego/netpoll v0.3.1/go.mod h1:1T2WVuQ+MQw6h6DpE45MohSvDTKdy2DlzCx2KsnPI4E=
github.com/cloudwego/netpoll v0.4.0/go.mod h1:xVefXptcyheopwNDZjDPcfU6kIjZXZ4nY550k1yH9eQ=
github.com/cloudwego/netpoll v0.4.1 h1:/pGsY7Rs09KqEXEniB9fcsEWfi1iY+66bKUO3/NO6hc=
github.com/cloudwego/netpoll v0.4.1/go.mod h1:xVefXptcyheopwNDZjDPcfU6kIjZXZ4nY550k1yH9eQ=
github.com/cloudwego/netpoll v0.4.2-0.20230913081710-1a27688e2033 h1:/VYzCYH+Brp8CW1u475U+mPS7lHv5ulKx0vFJbp3YZ0=
github.com/cloudwego/netpoll v0.4.2-0.20230913081710-1a27688e2033/go.mod h1:xVefXptcyheopwNDZjDPcfU6kIjZXZ4nY550k1yH9eQ=
github.com/cloudwego/netpoll v0.4.2-0.20230918061532-5719b5310f34 h1:AbZPQaXr7MzOiUf1OZauww5rjmBpeLlyhM+hD7UsCn8=
github.com/cloudwego/netpoll v0.4.2-0.20230918061532-5719b5310f34/go.mod h1:xVefXptcyheopwNDZjDPcfU6kIjZXZ4nY550k1yH9eQ=
github.com/cloudwego/netpoll v0.5.0 h1:oRrOp58cPCvK2QbMozZNDESvrxQaEHW2dCimmwH1lcU=
github.com/cloudwego/netpoll v0.5.0/go.mod h1:xVefXptcyheopwNDZjDPcfU6kIjZXZ4nY550k1yH9eQ=
github.com/cloudwego/thriftgo v0.1.2/go.mod h1:LzeafuLSiHA9JTiWC8TIMIq64iadeObgRUhmVG1OC/w=
github.com/cloudwego/thriftgo v0.2.4/go.mod h1:8i9AF5uDdWHGqzUhXDlubCjx4MEfKvWXGQlMWyR0tM4=
github.com/cloudwego/thriftgo v0.2.7/go.mod h1:8i9AF5uDdWHGqzUhXDlubCjx4MEfKvWXGQlMWyR0tM4=
Expand Down
6 changes: 3 additions & 3 deletions pkg/connpool/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ const (
defaultMaxIdleTimeout = 30 * time.Second
minMaxIdleTimeout = 2 * time.Second
maxMinIdlePerAddress = 5
defaultMaxIdleGlobal = 1 << 20 // no limit
)

// CheckPoolConfig to check invalid param.
Expand Down Expand Up @@ -58,9 +59,8 @@ func CheckPoolConfig(config IdleConfig) *IdleConfig {

// globalIdle
if config.MaxIdleGlobal <= 0 {
config.MaxIdleGlobal = 1
}
if config.MaxIdleGlobal < config.MaxIdlePerAddress {
config.MaxIdleGlobal = defaultMaxIdleGlobal
} else if config.MaxIdleGlobal < config.MaxIdlePerAddress {
config.MaxIdleGlobal = config.MaxIdlePerAddress
}
return &config
Expand Down
2 changes: 1 addition & 1 deletion pkg/connpool/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestCheckPoolConfig(t *testing.T) {
cfg = CheckPoolConfig(IdleConfig{MinIdlePerAddress: -1})
test.Assert(t, cfg.MinIdlePerAddress == 0)
test.Assert(t, cfg.MaxIdlePerAddress == 1)
test.Assert(t, cfg.MaxIdleGlobal == 1)
test.Assert(t, cfg.MaxIdleGlobal == defaultMaxIdleGlobal)
cfg = CheckPoolConfig(IdleConfig{MinIdlePerAddress: 1})
test.Assert(t, cfg.MinIdlePerAddress == 1)
cfg = CheckPoolConfig(IdleConfig{MinIdlePerAddress: maxMinIdlePerAddress + 1})
Expand Down
4 changes: 2 additions & 2 deletions pkg/remote/codec/header_codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,9 +343,9 @@ func (m *mockInst) Address() net.Addr {
return m.addr
}

func (m *mockInst) SetRemoteAddr(addr net.Addr) (ok bool) {
func (m *mockInst) RefreshInstanceWithAddr(addr net.Addr) discovery.Instance {
m.addr = addr
return true
return m
}

func (m *mockInst) Weight() int {
Expand Down
39 changes: 39 additions & 0 deletions pkg/remote/codec/protobuf/encoding/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
package encoding

import (
"fmt"
"io"
"strings"
)
Expand Down Expand Up @@ -82,6 +83,44 @@ func GetCompressor(name string) Compressor {
return registeredCompressor[name]
}

// FindCompressorName returns the name of compressor that actually used.
// when cname is like "identity,deflate,gzip", only one compressor name should be returned.
func FindCompressorName(cname string) string {
compressor, _ := FindCompressor(cname)
if compressor != nil {
return compressor.Name()
}
return ""
}

// FindCompressor is used to search for compressors based on a given name, where the input name can be an array of compressor names.
func FindCompressor(cname string) (compressor Compressor, err error) {
// if cname is empty, it means there's no compressor
if cname == "" {
return nil, nil
}
// cname can be an array, such as "identity,deflate,gzip", which means there should be at least one compressor registered.
// found available compressors
var hasIdentity bool
for _, name := range strings.Split(strings.TrimSuffix(cname, ";"), ",") {
name = strings.TrimSpace(name)
if name == Identity {
hasIdentity = true
}
compressor = GetCompressor(name)
if compressor != nil {
break
}
}
if compressor == nil {
if hasIdentity {
return nil, nil
}
return nil, fmt.Errorf("no kitex compressor registered found for:%v", cname)
}
return compressor, nil
}

// Codec defines the interface gRPC uses to encode and decode messages. Note
// that implementations of this interface must be thread safe; a Codec's
// methods can be called from concurrent goroutines.
Expand Down
Loading

0 comments on commit 8bdd3ad

Please sign in to comment.