Skip to content

Commit

Permalink
Merge branch 'develop' into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
ShawnJeffersonWang authored Oct 23, 2024
2 parents 45cb131 + 045c5ab commit 5bd8994
Show file tree
Hide file tree
Showing 36 changed files with 716 additions and 125 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ jobs:
go install ./tool/cmd/kitex
LOCAL_REPO=$(pwd)
cd ..
git clone https://github.com/cloudwego/kitex-tests.git
git clone https://github.com/cloudwego/kitex-tests.git
cd kitex-tests/codegen
go mod init codegen-test
go mod edit -replace=github.com/apache/thrift=github.com/apache/[email protected]
Expand Down
5 changes: 5 additions & 0 deletions client/genericclient/generic_stream_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,5 +100,10 @@ func newClientStreamingServiceInfo(g generic.Generic) *serviceinfo.ServiceInfo {
Extra: make(map[string]interface{}),
}
svcInfo.Extra["generic"] = true
if extra, ok := g.(generic.ExtraProvider); ok {
if extra.GetExtra(generic.CombineServiceKey) == "true" {
svcInfo.Extra["combine_service"] = true
}
}
return svcInfo
}
1 change: 1 addition & 0 deletions client/genericclient/generic_stream_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func TestGenericStreamService(t *testing.T) {

svcInfo := newClientStreamingServiceInfo(g)
test.Assert(t, svcInfo.Extra["generic"] == true)
test.Assert(t, svcInfo.Extra["combine_service"] == nil)
svcInfo.GenericMethod = func(name string) serviceinfo.MethodInfo {
return svcInfo.Methods[name]
}
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ require (
github.com/bytedance/gopkg v0.1.1
github.com/bytedance/sonic v1.12.2
github.com/cloudwego/configmanager v0.2.2
github.com/cloudwego/dynamicgo v0.4.0
github.com/cloudwego/dynamicgo v0.4.3-0.20241009082247-81665bf11089
github.com/cloudwego/fastpb v0.0.5
github.com/cloudwego/frugal v0.2.0
github.com/cloudwego/gopkg v0.1.2
github.com/cloudwego/localsession v0.0.2
github.com/cloudwego/localsession v0.1.1
github.com/cloudwego/netpoll v0.6.4
github.com/cloudwego/runtimex v0.1.0
github.com/cloudwego/thriftgo v0.3.17
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ github.com/cloudwego/base64x v0.1.4 h1:jwCgWpFanWmN8xoIUHa2rtzmkd5J2plF/dnLS6Xd/
github.com/cloudwego/base64x v0.1.4/go.mod h1:0zlkT4Wn5C6NdauXdJRhSKRlJvmclQ1hhJgA0rcu/8w=
github.com/cloudwego/configmanager v0.2.2 h1:sVrJB8gWYTlPV2OS3wcgJSO9F2/9Zbkmcm1Z7jempOU=
github.com/cloudwego/configmanager v0.2.2/go.mod h1:ppiyU+5TPLonE8qMVi/pFQk2eL3Q4P7d4hbiNJn6jwI=
github.com/cloudwego/dynamicgo v0.4.0 h1:wQqNRNiSQaLkbcn3sfpEJGZsz3xf8Il4P/3DcENsrFI=
github.com/cloudwego/dynamicgo v0.4.0/go.mod h1:zgWk2oz56EyH790LJSxrTz1j01GJBO964jJQ/y7qjJc=
github.com/cloudwego/dynamicgo v0.4.3-0.20241009082247-81665bf11089 h1:RBCbq1O2+U/POTgRCRxcC+YXOQS2BtAIYtLUHfNuhFM=
github.com/cloudwego/dynamicgo v0.4.3-0.20241009082247-81665bf11089/go.mod h1:zgWk2oz56EyH790LJSxrTz1j01GJBO964jJQ/y7qjJc=
github.com/cloudwego/fastpb v0.0.5 h1:vYnBPsfbAtU5TVz5+f9UTlmSCixG9F9vRwaqE0mZPZU=
github.com/cloudwego/fastpb v0.0.5/go.mod h1:Bho7aAKBUtT9RPD2cNVkTdx4yQumfSv3If7wYnm1izk=
github.com/cloudwego/frugal v0.2.0 h1:0ETSzQYoYqVvdl7EKjqJ9aJnDoG6TzvNKV3PMQiQTS8=
Expand All @@ -28,8 +28,8 @@ github.com/cloudwego/gopkg v0.1.2 h1:650t+RiZGht8qX+y0hl49JXJCuO44GhbGZuxDzr2PyI
github.com/cloudwego/gopkg v0.1.2/go.mod h1:WoNTdXDPdvL97cBmRUWXVGkh2l2UFmpd9BUvbW2r0Aw=
github.com/cloudwego/iasm v0.2.0 h1:1KNIy1I1H9hNNFEEH3DVnI4UujN+1zjpuk6gwHLTssg=
github.com/cloudwego/iasm v0.2.0/go.mod h1:8rXZaNYT2n95jn+zTI1sDr+IgcD2GVs0nlbbQPiEFhY=
github.com/cloudwego/localsession v0.0.2 h1:N9/IDtCPj1fCL9bCTP+DbXx3f40YjVYWcwkJG0YhQkY=
github.com/cloudwego/localsession v0.0.2/go.mod h1:kiJxmvAcy4PLgKtEnPS5AXed3xCiXcs7Z+KBHP72Wv8=
github.com/cloudwego/localsession v0.1.1 h1:tbK7laDVrYfFDXoBXo4uCGMAxU4qmz2dDm8d4BGBnDo=
github.com/cloudwego/localsession v0.1.1/go.mod h1:kiJxmvAcy4PLgKtEnPS5AXed3xCiXcs7Z+KBHP72Wv8=
github.com/cloudwego/netpoll v0.6.4 h1:z/dA4sOTUQof6zZIO4QNnLBXsDFFFEos9OOGloR6kno=
github.com/cloudwego/netpoll v0.6.4/go.mod h1:BtM+GjKTdwKoC8IOzD08/+8eEn2gYoiNLipFca6BVXQ=
github.com/cloudwego/runtimex v0.1.0 h1:HG+WxWoj5/CDChDZ7D99ROwvSMkuNXAqt6hnhTTZDiI=
Expand Down
65 changes: 41 additions & 24 deletions internal/wpool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ import (
)

// Task is the function that the worker will execute.
type Task func()
type Task struct {
ctx context.Context
f func()
}

// Pool is a worker pool bind with some idle goroutines.
type Pool struct {
Expand Down Expand Up @@ -61,54 +64,68 @@ func (p *Pool) Size() int32 {
}

// Go creates/reuses a worker to run task.
func (p *Pool) Go(task Task) {
p.GoCtx(context.Background(), task)
func (p *Pool) Go(f func()) {
p.GoCtx(context.Background(), f)
}

// GoCtx creates/reuses a worker to run task.
func (p *Pool) GoCtx(ctx context.Context, task Task) {
func (p *Pool) GoCtx(ctx context.Context, f func()) {
t := Task{ctx: ctx, f: f}
select {
case p.tasks <- task:
case p.tasks <- t:
// reuse exist worker
return
default:
}

// create new worker
atomic.AddInt32(&p.size, 1)
go func() {
// single shot if p.size > p.maxIdle
if atomic.AddInt32(&p.size, 1) > p.maxIdle {
go func(t Task) {
defer func() {
if r := recover(); r != nil {
klog.Errorf("panic in wpool: error=%v: stack=%s", r, debug.Stack())
}
atomic.AddInt32(&p.size, -1)
}()
if profiler.IsEnabled(t.ctx) {
profiler.Tag(t.ctx)
t.f()
profiler.Untag(t.ctx)
} else {
t.f()
}
}(t)
return
}

// background goroutines for consuming tasks
go func(t Task) {
defer func() {
if r := recover(); r != nil {
klog.Errorf("panic in wpool: error=%v: stack=%s", r, debug.Stack())
}
atomic.AddInt32(&p.size, -1)
}()

profiler.Tag(ctx)
task()
profiler.Untag(ctx)

if atomic.LoadInt32(&p.size) > p.maxIdle {
return
}

// waiting for new task
idleTimer := time.NewTimer(p.maxIdleTime)
for {
if profiler.IsEnabled(t.ctx) {
profiler.Tag(t.ctx)
t.f()
profiler.Untag(t.ctx)
} else {
t.f()
}
idleTimer.Reset(p.maxIdleTime)
select {
case task = <-p.tasks:
profiler.Tag(ctx)
task()
profiler.Untag(ctx)
case t = <-p.tasks:
case <-idleTimer.C:
// worker exits
return
}

if !idleTimer.Stop() {
<-idleTimer.C
}
idleTimer.Reset(p.maxIdleTime)
}
}()
}(t)
}
13 changes: 13 additions & 0 deletions internal/wpool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package wpool

import (
"context"
"runtime"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -55,3 +56,15 @@ func TestWPool(t *testing.T) {
}
test.Assert(t, p.Size() == 0)
}

func BenchmarkWPool(b *testing.B) {
maxIdleWorkers := runtime.GOMAXPROCS(0)
ctx := context.Background()
p := New(maxIdleWorkers, 10*time.Millisecond)
for i := 0; i < b.N; i++ {
p.GoCtx(ctx, func() {})
for int(p.Size()) > maxIdleWorkers {
runtime.Gosched()
}
}
}
28 changes: 16 additions & 12 deletions pkg/generic/descriptor/descriptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,6 @@ import (

var isGoTagAliasDisabled = os.Getenv("KITEX_GENERIC_GOTAG_ALIAS_DISABLED") == "True"

func init() {
if isGoTagAliasDisabled {
// disable go.tag for dynamicgo
dthrift.RemoveAnnotationMapper(dthrift.AnnoScopeField, "go.tag")
}
}

// FieldDescriptor idl field descriptor
type FieldDescriptor struct {
Name string // field name
Expand All @@ -47,11 +40,21 @@ type FieldDescriptor struct {
Type *TypeDescriptor
HTTPMapping HTTPMapping
ValueMapping ValueMapping
GoTagOpt *GoTagOption
}

type GoTagOption struct {
IsGoAliasDisabled bool
}

// FieldName return field name maybe with an alias
func (d *FieldDescriptor) FieldName() string {
if d.Alias != "" && !isGoTagAliasDisabled {
aliasDisabled := isGoTagAliasDisabled
if d.GoTagOpt != nil {
aliasDisabled = d.GoTagOpt.IsGoAliasDisabled
}

if d.Alias != "" && !aliasDisabled {
return d.Alias
}
return d.Name
Expand Down Expand Up @@ -98,10 +101,11 @@ type FunctionDescriptor struct {

// ServiceDescriptor idl service descriptor
type ServiceDescriptor struct {
Name string
Functions map[string]*FunctionDescriptor
Router Router
DynamicGoDsc *dthrift.ServiceDescriptor
Name string
Functions map[string]*FunctionDescriptor
Router Router
DynamicGoDsc *dthrift.ServiceDescriptor
IsCombinedServices bool
}

// LookupFunctionByMethod lookup function by method
Expand Down
27 changes: 27 additions & 0 deletions pkg/generic/generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,13 @@ type Method struct {
StreamingMode serviceinfo.StreamingMode
}

// ExtraProvider provides extra info for generic
type ExtraProvider interface {
GetExtra(key string) string
}

const CombineServiceKey = "combine_service"

// BinaryThriftGeneric raw thrift binary Generic
func BinaryThriftGeneric() Generic {
return &binaryThriftGeneric{}
Expand Down Expand Up @@ -265,6 +272,10 @@ func (g *mapThriftGeneric) MessageReaderWriter() interface{} {
return g.codec.getMessageReaderWriter()
}

func (g *mapThriftGeneric) GetExtra(key string) string {
return g.codec.extra[key]
}

type jsonThriftGeneric struct {
codec *jsonThriftCodec
}
Expand Down Expand Up @@ -297,6 +308,10 @@ func (g *jsonThriftGeneric) MessageReaderWriter() interface{} {
return g.codec.getMessageReaderWriter()
}

func (g *jsonThriftGeneric) GetExtra(key string) string {
return g.codec.extra[key]
}

type jsonPbGeneric struct {
codec *jsonPbCodec
}
Expand Down Expand Up @@ -329,6 +344,10 @@ func (g *jsonPbGeneric) MessageReaderWriter() interface{} {
return g.codec.getMessageReaderWriter()
}

func (g *jsonPbGeneric) GetExtra(key string) string {
return g.codec.extra[key]
}

type httpThriftGeneric struct {
codec *httpThriftCodec
}
Expand Down Expand Up @@ -361,6 +380,10 @@ func (g *httpThriftGeneric) MessageReaderWriter() interface{} {
return g.codec.getMessageReaderWriter()
}

func (g *httpThriftGeneric) GetExtra(key string) string {
return g.codec.extra[key]
}

type httpPbThriftGeneric struct {
codec *httpPbThriftCodec
}
Expand Down Expand Up @@ -392,3 +415,7 @@ func (g *httpPbThriftGeneric) IDLServiceName() string {
func (g *httpPbThriftGeneric) MessageReaderWriter() interface{} {
return g.codec.getMessageReaderWriter()
}

func (g *httpPbThriftGeneric) GetExtra(key string) string {
return g.codec.extra[key]
}
19 changes: 16 additions & 3 deletions pkg/generic/generic_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,27 @@ type Service interface {

// ServiceInfoWithGeneric create a generic ServiceInfo
func ServiceInfoWithGeneric(g Generic) *serviceinfo.ServiceInfo {
return newServiceInfo(g.PayloadCodecType(), g.MessageReaderWriter(), g.IDLServiceName(), true)
isCombinedServices := getIsCombinedServices(g)
return newServiceInfo(g.PayloadCodecType(), g.MessageReaderWriter(), g.IDLServiceName(), isCombinedServices, true)
}

func getIsCombinedServices(g Generic) bool {
if extra, ok := g.(ExtraProvider); ok {
if extra.GetExtra(CombineServiceKey) == "true" {
return true
}
}
return false
}

// Deprecated: Replaced by ServiceInfoWithGeneric, this method will be removed in v0.12.0
// ServiceInfo create a generic ServiceInfo
// TODO(marina.sakai): remove in v0.12.0
func ServiceInfo(pcType serviceinfo.PayloadCodec) *serviceinfo.ServiceInfo {
return newServiceInfo(pcType, nil, "", false)
return newServiceInfo(pcType, nil, "", false, false)
}

func newServiceInfo(pcType serviceinfo.PayloadCodec, messageReaderWriter interface{}, serviceName string, withGeneric bool) *serviceinfo.ServiceInfo {
func newServiceInfo(pcType serviceinfo.PayloadCodec, messageReaderWriter interface{}, serviceName string, isCombinedServices, withGeneric bool) *serviceinfo.ServiceInfo {
handlerType := (*Service)(nil)

methods, svcName := GetMethodInfo(messageReaderWriter, serviceName)
Expand All @@ -63,6 +73,9 @@ func newServiceInfo(pcType serviceinfo.PayloadCodec, messageReaderWriter interfa
Extra: make(map[string]interface{}),
}
svcInfo.Extra["generic"] = true
if isCombinedServices {
svcInfo.Extra["combine_service"] = true
}
// TODO(marina.sakai): remove in v0.12.0
if !withGeneric {
svcInfo.Extra[DeprecatedGenericServiceInfoAPIKey] = true
Expand Down
Loading

0 comments on commit 5bd8994

Please sign in to comment.