diff --git a/cmd/terway-controlplane/terway-controlplane.go b/cmd/terway-controlplane/terway-controlplane.go index a323eb23..3139255a 100644 --- a/cmd/terway-controlplane/terway-controlplane.go +++ b/cmd/terway-controlplane/terway-controlplane.go @@ -39,7 +39,6 @@ import ( utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" clientgoscheme "k8s.io/client-go/kubernetes/scheme" - "k8s.io/client-go/util/flowcontrol" "k8s.io/klog/v2" "k8s.io/klog/v2/klogr" ctrl "sigs.k8s.io/controller-runtime" @@ -205,7 +204,7 @@ func main() { panic(err) } - aliyunClient, err := aliyun.New(clientSet, flowcontrol.NewTokenBucketRateLimiter(cfg.ReadOnlyQPS, cfg.ReadOnlyBurst), flowcontrol.NewTokenBucketRateLimiter(cfg.MutatingQPS, cfg.MutatingBurst)) + aliyunClient, err := aliyun.New(clientSet, aliyun.FromMap(cfg.RateLimit)) if err != nil { panic(err) } diff --git a/daemon/builder.go b/daemon/builder.go index 744dbd01..daebddb7 100644 --- a/daemon/builder.go +++ b/daemon/builder.go @@ -10,7 +10,6 @@ import ( "time" "github.com/samber/lo" - "k8s.io/client-go/util/flowcontrol" "github.com/AliyunContainerService/terway/pkg/aliyun/client" "github.com/AliyunContainerService/terway/pkg/aliyun/credential" @@ -159,9 +158,7 @@ func (b *NetworkServiceBuilder) setupAliyunClient() error { return err } - aliyunClient, err := client.New(clientSet, - flowcontrol.NewTokenBucketRateLimiter(8, 10), - flowcontrol.NewTokenBucketRateLimiter(4, 5)) + aliyunClient, err := client.New(clientSet, client.FromMap(b.config.RateLimit)) if err != nil { return err } diff --git a/examples/maxpods/maxpods.go b/examples/maxpods/maxpods.go index 493b9bb1..2b867270 100644 --- a/examples/maxpods/maxpods.go +++ b/examples/maxpods/maxpods.go @@ -7,7 +7,6 @@ import ( "log" "github.com/sirupsen/logrus" - "k8s.io/client-go/util/flowcontrol" "github.com/AliyunContainerService/terway/pkg/aliyun/client" "github.com/AliyunContainerService/terway/pkg/aliyun/credential" @@ -49,9 +48,7 @@ func main() { panic(err) } - api, err := client.New(c, - flowcontrol.NewTokenBucketRateLimiter(8, 10), - flowcontrol.NewTokenBucketRateLimiter(4, 5)) + api, err := client.New(c, nil) if err != nil { panic(err) } diff --git a/pkg/aliyun/client/ecs.go b/pkg/aliyun/client/ecs.go index 619366b2..f2fd9f0f 100644 --- a/pkg/aliyun/client/ecs.go +++ b/pkg/aliyun/client/ecs.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "net/netip" - "strings" "time" "github.com/aliyun/alibaba-cloud-sdk-go/sdk/requests" @@ -12,7 +11,6 @@ import ( "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/trace" "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/client-go/util/flowcontrol" logf "sigs.k8s.io/controller-runtime/pkg/log" apiErr "github.com/AliyunContainerService/terway/pkg/aliyun/client/errors" @@ -24,28 +22,39 @@ import ( var _ VPC = &OpenAPI{} var _ ECS = &OpenAPI{} +const ( + APICreateNetworkInterface = "CreateNetworkInterface" + APIDescribeNetworkInterfaces = "DescribeNetworkInterfaces" + APIAttachNetworkInterface = "AttachNetworkInterface" + APIDetachNetworkInterface = "DetachNetworkInterface" + APIDeleteNetworkInterface = "DeleteNetworkInterface" + APIAssignPrivateIPAddress = "AssignPrivateIpAddresses" + APIUnAssignPrivateIPAddresses = "UnAssignPrivateIpAddresses" + APIAssignIPv6Addresses = "AssignIpv6Addresses" + APIUnAssignIpv6Addresses = "UnAssignIpv6Addresses" + APIDescribeInstanceTypes = "DescribeInstanceTypes" +) + type OpenAPI struct { ClientSet credential.Client IdempotentKeyGen IdempotentKeyGen - ReadOnlyRateLimiter flowcontrol.RateLimiter - MutatingRateLimiter flowcontrol.RateLimiter + RateLimiter *RateLimiter Tracer trace.Tracer } -func New(c credential.Client, readOnly, mutating flowcontrol.RateLimiter) (*OpenAPI, error) { +func New(c credential.Client, cfg LimitConfig) (*OpenAPI, error) { return &OpenAPI{ - ClientSet: c, - IdempotentKeyGen: NewIdempotentKeyGenerator(), - ReadOnlyRateLimiter: readOnly, - MutatingRateLimiter: mutating, - Tracer: otel.Tracer("openAPI"), + ClientSet: c, + IdempotentKeyGen: NewIdempotentKeyGenerator(), + RateLimiter: NewRateLimiter(cfg), + Tracer: otel.Tracer("openAPI"), }, nil } func (a *OpenAPI) CreateNetworkInterface(ctx context.Context, opts ...CreateNetworkInterfaceOption) (*NetworkInterface, error) { - ctx, span := a.Tracer.Start(ctx, "CreateNetworkInterface") + ctx, span := a.Tracer.Start(ctx, APICreateNetworkInterface) defer span.End() option := &CreateNetworkInterfaceOptions{} @@ -65,10 +74,13 @@ func (a *OpenAPI) CreateNetworkInterface(ctx context.Context, opts ...CreateNetw ) err = wait.ExponentialBackoffWithContext(ctx, *option.Backoff, func(ctx context.Context) (bool, error) { - a.MutatingRateLimiter.Accept() + err = a.RateLimiter.Wait(ctx, APICreateNetworkInterface) + if err != nil { + return false, err + } start := time.Now() resp, innerErr = a.ClientSet.ECS().CreateNetworkInterface(req) - metric.OpenAPILatency.WithLabelValues("CreateNetworkInterface", fmt.Sprint(innerErr != nil)).Observe(metric.MsSince(start)) + metric.OpenAPILatency.WithLabelValues(APICreateNetworkInterface, fmt.Sprint(innerErr != nil)).Observe(metric.MsSince(start)) if innerErr != nil { innerErr = apiErr.WarpError(innerErr) l.WithValues(LogFieldRequestID, apiErr.ErrRequestID(innerErr)).Error(innerErr, "failed") @@ -93,7 +105,7 @@ func (a *OpenAPI) CreateNetworkInterface(ctx context.Context, opts ...CreateNetw // DescribeNetworkInterface list eni func (a *OpenAPI) DescribeNetworkInterface(ctx context.Context, vpcID string, eniID []string, instanceID string, instanceType string, status string, tags map[string]string) ([]*NetworkInterface, error) { - ctx, span := a.Tracer.Start(ctx, "DescribeNetworkInterface") + ctx, span := a.Tracer.Start(ctx, APIDescribeNetworkInterfaces) defer span.End() var result []*NetworkInterface @@ -108,6 +120,11 @@ func (a *OpenAPI) DescribeNetworkInterface(ctx context.Context, vpcID string, en } for { + err := a.RateLimiter.Wait(ctx, APIDescribeNetworkInterfaces) + if err != nil { + return nil, err + } + req := ecs.CreateDescribeNetworkInterfacesRequest() req.NextToken = nextToken req.VpcId = vpcID @@ -123,10 +140,9 @@ func (a *OpenAPI) DescribeNetworkInterface(ctx context.Context, vpcID string, en l := LogFields(logf.FromContext(ctx), req) - a.ReadOnlyRateLimiter.Accept() start := time.Now() resp, err := a.ClientSet.ECS().DescribeNetworkInterfaces(req) - metric.OpenAPILatency.WithLabelValues("DescribeNetworkInterfaces", fmt.Sprint(err != nil)).Observe(metric.MsSince(start)) + metric.OpenAPILatency.WithLabelValues(APIDescribeNetworkInterfaces, fmt.Sprint(err != nil)).Observe(metric.MsSince(start)) if err != nil { err = apiErr.WarpError(err) l.WithValues(LogFieldRequestID, apiErr.ErrRequestID(err)).Error(err, "error describe eni") @@ -151,22 +167,24 @@ func (a *OpenAPI) DescribeNetworkInterface(ctx context.Context, vpcID string, en // AttachNetworkInterface attach eni func (a *OpenAPI) AttachNetworkInterface(ctx context.Context, eniID, instanceID, trunkENIID string) error { - ctx, span := a.Tracer.Start(ctx, "AttachNetworkInterface") + ctx, span := a.Tracer.Start(ctx, APIAttachNetworkInterface) defer span.End() + err := a.RateLimiter.Wait(ctx, APIAttachNetworkInterface) + if err != nil { + return err + } + req := ecs.CreateAttachNetworkInterfaceRequest() req.NetworkInterfaceId = eniID req.InstanceId = instanceID req.TrunkNetworkInstanceId = trunkENIID - l := logf.FromContext(ctx).WithValues(LogFieldAPI, "AttachNetworkInterface", - LogFieldENIID, eniID, - LogFieldInstanceID, instanceID) + l := LogFields(logf.FromContext(ctx), req) - a.MutatingRateLimiter.Accept() start := time.Now() resp, err := a.ClientSet.ECS().AttachNetworkInterface(req) - metric.OpenAPILatency.WithLabelValues("AttachNetworkInterface", fmt.Sprint(err != nil)).Observe(metric.MsSince(start)) + metric.OpenAPILatency.WithLabelValues(APIAttachNetworkInterface, fmt.Sprint(err != nil)).Observe(metric.MsSince(start)) if err != nil { err = apiErr.WarpError(err) l.WithValues(LogFieldRequestID, apiErr.ErrRequestID(err)).Error(err, "attach eni failed") @@ -178,7 +196,7 @@ func (a *OpenAPI) AttachNetworkInterface(ctx context.Context, eniID, instanceID, // DetachNetworkInterface detach eni func (a *OpenAPI) DetachNetworkInterface(ctx context.Context, eniID, instanceID, trunkENIID string) error { - ctx, span := a.Tracer.Start(ctx, "DetachNetworkInterface") + ctx, span := a.Tracer.Start(ctx, APIDetachNetworkInterface) defer span.End() req := ecs.CreateDetachNetworkInterfaceRequest() @@ -186,15 +204,15 @@ func (a *OpenAPI) DetachNetworkInterface(ctx context.Context, eniID, instanceID, req.InstanceId = instanceID req.TrunkNetworkInstanceId = trunkENIID - l := logf.FromContext(ctx).WithValues( - LogFieldAPI, "DetachNetworkInterface", - LogFieldENIID, eniID, - LogFieldInstanceID, instanceID, - ) - a.MutatingRateLimiter.Accept() + l := LogFields(logf.FromContext(ctx), req) + + err := a.RateLimiter.Wait(ctx, APIDetachNetworkInterface) + if err != nil { + return err + } start := time.Now() resp, err := a.ClientSet.ECS().DetachNetworkInterface(req) - metric.OpenAPILatency.WithLabelValues("DetachNetworkInterface", fmt.Sprint(err != nil)).Observe(metric.MsSince(start)) + metric.OpenAPILatency.WithLabelValues(APIDetachNetworkInterface, fmt.Sprint(err != nil)).Observe(metric.MsSince(start)) if err != nil { err = apiErr.WarpError(err) if apiErr.ErrorCodeIs(err, apiErr.ErrInvalidENINotFound, apiErr.ErrInvalidEcsIDNotFound) { @@ -209,20 +227,21 @@ func (a *OpenAPI) DetachNetworkInterface(ctx context.Context, eniID, instanceID, // DeleteNetworkInterface del eni by id func (a *OpenAPI) DeleteNetworkInterface(ctx context.Context, eniID string) error { - ctx, span := a.Tracer.Start(ctx, "DeleteNetworkInterface") + ctx, span := a.Tracer.Start(ctx, APIDeleteNetworkInterface) defer span.End() req := ecs.CreateDeleteNetworkInterfaceRequest() req.NetworkInterfaceId = eniID - l := logf.FromContext(ctx).WithValues( - LogFieldAPI, "DeleteNetworkInterface", - LogFieldENIID, eniID, - ) - a.MutatingRateLimiter.Accept() + l := LogFields(logf.FromContext(ctx), req) + + err := a.RateLimiter.Wait(ctx, APIDeleteNetworkInterface) + if err != nil { + return err + } start := time.Now() resp, err := a.ClientSet.ECS().DeleteNetworkInterface(req) - metric.OpenAPILatency.WithLabelValues("DeleteNetworkInterface", fmt.Sprint(err != nil)).Observe(metric.MsSince(start)) + metric.OpenAPILatency.WithLabelValues(APIDeleteNetworkInterface, fmt.Sprint(err != nil)).Observe(metric.MsSince(start)) if err != nil { err = apiErr.WarpError(err) l.WithValues(LogFieldRequestID, apiErr.ErrRequestID(err)).Error(err, "delete eni failed") @@ -268,7 +287,7 @@ func (a *OpenAPI) WaitForNetworkInterface(ctx context.Context, eniID string, sta } func (a *OpenAPI) AssignPrivateIPAddress(ctx context.Context, opts ...AssignPrivateIPAddressOption) ([]netip.Addr, error) { - ctx, span := a.Tracer.Start(ctx, "AssignPrivateIPAddress") + ctx, span := a.Tracer.Start(ctx, APIAssignPrivateIPAddress) defer span.End() option := &AssignPrivateIPAddressOptions{} @@ -288,10 +307,13 @@ func (a *OpenAPI) AssignPrivateIPAddress(ctx context.Context, opts ...AssignPriv ) err = wait.ExponentialBackoffWithContext(ctx, *option.Backoff, func(ctx context.Context) (bool, error) { - a.MutatingRateLimiter.Accept() + innerErr = a.RateLimiter.Wait(ctx, APIAssignPrivateIPAddress) + if innerErr != nil { + return true, innerErr + } start := time.Now() resp, innerErr = a.ClientSet.ECS().AssignPrivateIpAddresses(req) - metric.OpenAPILatency.WithLabelValues("AssignPrivateIpAddresses", fmt.Sprint(innerErr != nil)).Observe(metric.MsSince(start)) + metric.OpenAPILatency.WithLabelValues(APIAssignPrivateIPAddress, fmt.Sprint(innerErr != nil)).Observe(metric.MsSince(start)) if innerErr != nil { innerErr = apiErr.WarpError(innerErr) l.WithValues(LogFieldRequestID, apiErr.ErrRequestID(innerErr)).Error(innerErr, "failed") @@ -324,22 +346,24 @@ func (a *OpenAPI) UnAssignPrivateIPAddresses(ctx context.Context, eniID string, return nil } - ctx, span := a.Tracer.Start(ctx, "UnAssignPrivateIPAddresses") + ctx, span := a.Tracer.Start(ctx, APIUnAssignPrivateIPAddresses) defer span.End() + err := a.RateLimiter.Wait(ctx, APIUnAssignPrivateIPAddresses) + if err != nil { + return err + } + req := ecs.CreateUnassignPrivateIpAddressesRequest() req.NetworkInterfaceId = eniID str := ip.IPAddrs2str(ips) req.PrivateIpAddress = &str - l := logf.FromContext(ctx).WithValues( - LogFieldAPI, "UnassignPrivateIpAddresses", - LogFieldENIID, eniID, - LogFieldIPs, strings.Join(str, ","), - ) + l := LogFields(logf.FromContext(ctx), req) + start := time.Now() resp, err := a.ClientSet.ECS().UnassignPrivateIpAddresses(req) - metric.OpenAPILatency.WithLabelValues("UnassignPrivateIpAddresses", fmt.Sprint(err != nil)).Observe(metric.MsSince(start)) + metric.OpenAPILatency.WithLabelValues(APIUnAssignPrivateIPAddresses, fmt.Sprint(err != nil)).Observe(metric.MsSince(start)) if err != nil { err = apiErr.WarpError(err) @@ -357,7 +381,7 @@ func (a *OpenAPI) UnAssignPrivateIPAddresses(ctx context.Context, eniID string, // AssignIpv6Addresses assign ipv6 address func (a *OpenAPI) AssignIpv6Addresses(ctx context.Context, opts ...AssignIPv6AddressesOption) ([]netip.Addr, error) { - ctx, span := a.Tracer.Start(ctx, "AssignIpv6Addresses") + ctx, span := a.Tracer.Start(ctx, APIAssignIPv6Addresses) defer span.End() option := &AssignIPv6AddressesOptions{} @@ -377,10 +401,14 @@ func (a *OpenAPI) AssignIpv6Addresses(ctx context.Context, opts ...AssignIPv6Add ) err = wait.ExponentialBackoffWithContext(ctx, *option.Backoff, func(ctx context.Context) (bool, error) { - a.MutatingRateLimiter.Accept() + innerErr = a.RateLimiter.Wait(ctx, APIAssignIPv6Addresses) + if err != nil { + return true, innerErr + } + start := time.Now() resp, innerErr = a.ClientSet.ECS().AssignIpv6Addresses(req) - metric.OpenAPILatency.WithLabelValues("AssignIpv6Addresses", fmt.Sprint(innerErr != nil)).Observe(metric.MsSince(start)) + metric.OpenAPILatency.WithLabelValues(APIAssignIPv6Addresses, fmt.Sprint(innerErr != nil)).Observe(metric.MsSince(start)) if innerErr != nil { innerErr = apiErr.WarpError(innerErr) l.WithValues(LogFieldRequestID, apiErr.ErrRequestID(innerErr)).Error(innerErr, "failed") @@ -408,25 +436,28 @@ func (a *OpenAPI) AssignIpv6Addresses(ctx context.Context, opts ...AssignIPv6Add // UnAssignIpv6Addresses remove ip from eni // return ok if 1. eni is released 2. ip is already released 3. release success func (a *OpenAPI) UnAssignIpv6Addresses(ctx context.Context, eniID string, ips []netip.Addr) error { - ctx, span := a.Tracer.Start(ctx, "UnAssignIpv6Addresses") + ctx, span := a.Tracer.Start(ctx, APIUnAssignIpv6Addresses) defer span.End() if len(ips) == 0 { return nil } + + err := a.RateLimiter.Wait(ctx, APIUnAssignIpv6Addresses) + if err != nil { + return err + } + req := ecs.CreateUnassignIpv6AddressesRequest() req.NetworkInterfaceId = eniID str := ip.IPAddrs2str(ips) req.Ipv6Address = &str - l := logf.FromContext(ctx).WithValues( - LogFieldAPI, "UnassignIpv6Addresses", - LogFieldENIID, eniID, - LogFieldIPs, strings.Join(str, ","), - ) + l := LogFields(logf.FromContext(ctx), req) + start := time.Now() resp, err := a.ClientSet.ECS().UnassignIpv6Addresses(req) - metric.OpenAPILatency.WithLabelValues("UnassignIpv6Addresses", fmt.Sprint(err != nil)).Observe(metric.MsSince(start)) + metric.OpenAPILatency.WithLabelValues(APIUnAssignIpv6Addresses, fmt.Sprint(err != nil)).Observe(metric.MsSince(start)) if err != nil { err = apiErr.WarpError(err) @@ -443,13 +474,18 @@ func (a *OpenAPI) UnAssignIpv6Addresses(ctx context.Context, eniID string, ips [ } func (a *OpenAPI) DescribeInstanceTypes(ctx context.Context, types []string) ([]ecs.InstanceType, error) { - ctx, span := a.Tracer.Start(ctx, "DescribeInstanceTypes") + ctx, span := a.Tracer.Start(ctx, APIDescribeInstanceTypes) defer span.End() var result []ecs.InstanceType nextToken := "" for { + err := a.RateLimiter.Wait(ctx, APIDescribeInstanceTypes) + if err != nil { + return nil, err + } + req := ecs.CreateDescribeInstanceTypesRequest() req.NextToken = nextToken // nb(l1b0k): see https://help.aliyun.com/practice_detail/461278. @@ -459,7 +495,7 @@ func (a *OpenAPI) DescribeInstanceTypes(ctx context.Context, types []string) ([] } start := time.Now() resp, err := a.ClientSet.ECS().DescribeInstanceTypes(req) - metric.OpenAPILatency.WithLabelValues("DescribeInstanceTypes", fmt.Sprint(err != nil)).Observe(metric.MsSince(start)) + metric.OpenAPILatency.WithLabelValues(APIDescribeInstanceTypes, fmt.Sprint(err != nil)).Observe(metric.MsSince(start)) l := LogFields(logf.FromContext(ctx), req) @@ -480,26 +516,3 @@ func (a *OpenAPI) DescribeInstanceTypes(ctx context.Context, types []string) ([] return result, nil } - -func (a *OpenAPI) ModifyNetworkInterfaceAttribute(ctx context.Context, eniID string, securityGroupIDs []string) error { - ctx, span := a.Tracer.Start(ctx, "ModifyNetworkInterfaceAttribute") - defer span.End() - - req := ecs.CreateModifyNetworkInterfaceAttributeRequest() - req.NetworkInterfaceId = eniID - req.SecurityGroupId = &securityGroupIDs - start := time.Now() - resp, err := a.ClientSet.ECS().ModifyNetworkInterfaceAttribute(req) - metric.OpenAPILatency.WithLabelValues("ModifyNetworkInterfaceAttribute", fmt.Sprint(err != nil)).Observe(metric.MsSince(start)) - - l := logf.FromContext(ctx).WithValues( - LogFieldAPI, "ModifyNetworkInterfaceAttribute", - ) - if err != nil { - err = apiErr.WarpError(err) - l.WithValues(LogFieldRequestID, apiErr.ErrRequestID(err)).Error(err, "modify securityGroup failed") - return err - } - l.WithValues(LogFieldRequestID, resp.RequestId).Info("modify securityGroup", "ids", strings.Join(securityGroupIDs, ",")) - return nil -} diff --git a/pkg/aliyun/client/interface_default.go b/pkg/aliyun/client/interface_default.go index b3522d58..1da0d363 100644 --- a/pkg/aliyun/client/interface_default.go +++ b/pkg/aliyun/client/interface_default.go @@ -23,6 +23,5 @@ type ECS interface { UnAssignPrivateIPAddresses(ctx context.Context, eniID string, ips []netip.Addr) error AssignIpv6Addresses(ctx context.Context, opts ...AssignIPv6AddressesOption) ([]netip.Addr, error) UnAssignIpv6Addresses(ctx context.Context, eniID string, ips []netip.Addr) error - ModifyNetworkInterfaceAttribute(ctx context.Context, eniID string, securityGroupIDs []string) error DescribeInstanceTypes(ctx context.Context, types []string) ([]ecs.InstanceType, error) } diff --git a/pkg/aliyun/client/mocks/ECS.go b/pkg/aliyun/client/mocks/ECS.go index 26824580..b5cf92a6 100644 --- a/pkg/aliyun/client/mocks/ECS.go +++ b/pkg/aliyun/client/mocks/ECS.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.42.2. DO NOT EDIT. +// Code generated by mockery v2.43.2. DO NOT EDIT. package mocks @@ -246,24 +246,6 @@ func (_m *ECS) DetachNetworkInterface(ctx context.Context, eniID string, instanc return r0 } -// ModifyNetworkInterfaceAttribute provides a mock function with given fields: ctx, eniID, securityGroupIDs -func (_m *ECS) ModifyNetworkInterfaceAttribute(ctx context.Context, eniID string, securityGroupIDs []string) error { - ret := _m.Called(ctx, eniID, securityGroupIDs) - - if len(ret) == 0 { - panic("no return value specified for ModifyNetworkInterfaceAttribute") - } - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, string, []string) error); ok { - r0 = rf(ctx, eniID, securityGroupIDs) - } else { - r0 = ret.Error(0) - } - - return r0 -} - // UnAssignIpv6Addresses provides a mock function with given fields: ctx, eniID, ips func (_m *ECS) UnAssignIpv6Addresses(ctx context.Context, eniID string, ips []netip.Addr) error { ret := _m.Called(ctx, eniID, ips) diff --git a/pkg/aliyun/client/ratelimit.go b/pkg/aliyun/client/ratelimit.go new file mode 100644 index 00000000..d7cc5faa --- /dev/null +++ b/pkg/aliyun/client/ratelimit.go @@ -0,0 +1,84 @@ +package client + +import ( + "context" + "time" + + "golang.org/x/time/rate" + logf "sigs.k8s.io/controller-runtime/pkg/log" + + "github.com/AliyunContainerService/terway/pkg/metric" +) + +type LimitConfig map[string]Limit + +type Limit struct { + QPS float64 + Burst int +} + +var defaultLimit = map[string]int{ + "": 500, + "AttachNetworkInterface": 500, + "CreateNetworkInterface": 500, + "DeleteNetworkInterface": 500, + "DescribeNetworkInterfaces": 800, + "DetachNetworkInterface": 400, + "AssignPrivateIpAddresses": 400, + "UnassignPrivateIpAddresses": 400, + "AssignIpv6Addresses": 400, + "UnassignIpv6Addresses": 400, + "DescribeInstanceTypes": 400, + "DescribeVSwitches": 300, +} + +const ( + longThrottleLatency = 5 * time.Second +) + +func FromMap(in map[string]int) LimitConfig { + l := make(LimitConfig) + for k, v := range in { + l[k] = Limit{ + QPS: float64(v) / 60, + Burst: v, + } + } + return l +} + +type RateLimiter struct { + store map[string]*rate.Limiter +} + +func NewRateLimiter(cfg LimitConfig) *RateLimiter { + r := &RateLimiter{ + store: make(map[string]*rate.Limiter), + } + for k, v := range defaultLimit { + r.store[k] = rate.NewLimiter(rate.Limit(float64(v)/60), v) + } + for k, v := range cfg { + r.store[k] = rate.NewLimiter(rate.Limit(v.QPS), v.Burst) + } + + return r +} + +func (r *RateLimiter) Wait(ctx context.Context, name string) error { + start := time.Now() + defer func() { + took := time.Since(start) + metric.RateLimiterLatency.WithLabelValues(name).Observe(float64(took.Milliseconds())) + + if took >= longThrottleLatency { + l := logf.FromContext(ctx) + l.Info("client rate limit", "api", name, "took", took.Seconds()) + } + }() + v, ok := r.store[name] + if ok { + return v.Wait(ctx) + } + return r.store[""].Wait(ctx) +} diff --git a/pkg/aliyun/client/ratelimit_test.go b/pkg/aliyun/client/ratelimit_test.go new file mode 100644 index 00000000..0951928f --- /dev/null +++ b/pkg/aliyun/client/ratelimit_test.go @@ -0,0 +1,74 @@ +package client + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestNewRateLimiter(t *testing.T) { + type args struct { + cfg LimitConfig + } + tests := []struct { + name string + args args + checkFunc func(t *testing.T, r *RateLimiter) + }{ + { + name: "test default", + args: args{ + cfg: nil, + }, + checkFunc: func(t *testing.T, r *RateLimiter) { + assert.Equal(t, 400, r.store["DescribeInstanceTypes"].Burst()) + }, + }, + { + name: "test override", + args: args{ + cfg: map[string]Limit{ + "DescribeInstanceTypes": { + QPS: float64(600 / 60), + Burst: 600, + }, + }, + }, + checkFunc: func(t *testing.T, r *RateLimiter) { + assert.Equal(t, 600, r.store["DescribeInstanceTypes"].Burst()) + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tt.checkFunc(t, NewRateLimiter(tt.args.cfg)) + }) + } +} + +func TestRateLimiter_Wait(t *testing.T) { + r := NewRateLimiter(map[string]Limit{ + "foo": { + QPS: 1, + Burst: 1, + }, + }) + + start := time.Now() + wg := sync.WaitGroup{} + + for i := 0; i < 2; i++ { + wg.Add(1) + + go func() { + defer wg.Done() + err := r.Wait(context.Background(), "foo") + assert.NoError(t, err) + }() + } + wg.Wait() + assert.True(t, 1*time.Second < time.Since(start)) +} diff --git a/pkg/aliyun/client/vsw_default.go b/pkg/aliyun/client/vsw_default.go index 58269c81..37760d10 100644 --- a/pkg/aliyun/client/vsw_default.go +++ b/pkg/aliyun/client/vsw_default.go @@ -14,22 +14,28 @@ import ( "github.com/AliyunContainerService/terway/pkg/metric" ) +const ( + APIDescribeVSwitches = "DescribeVSwitches" +) + // DescribeVSwitchByID get vsw by id func (a *OpenAPI) DescribeVSwitchByID(ctx context.Context, vSwitchID string) (*vpc.VSwitch, error) { - ctx, span := a.Tracer.Start(ctx, "DescribeVSwitchByID") + ctx, span := a.Tracer.Start(ctx, APIDescribeVSwitches) defer span.End() + err := a.RateLimiter.Wait(ctx, APIDescribeInstanceTypes) + if err != nil { + return nil, err + } + req := vpc.CreateDescribeVSwitchesRequest() req.VSwitchId = vSwitchID - l := logf.FromContext(ctx).WithValues( - LogFieldAPI, "DescribeVSwitches", - LogFieldVSwitchID, vSwitchID, - ) + l := LogFields(logf.FromContext(ctx), req) start := time.Now() resp, err := a.ClientSet.VPC().DescribeVSwitches(req) - metric.OpenAPILatency.WithLabelValues("DescribeVSwitches", fmt.Sprint(err != nil)).Observe(metric.MsSince(start)) + metric.OpenAPILatency.WithLabelValues(APIDescribeVSwitches, fmt.Sprint(err != nil)).Observe(metric.MsSince(start)) if err != nil { err = apiErr.WarpError(err) l.WithValues(LogFieldRequestID, apiErr.ErrRequestID(err)).Error(err, "DescribeVSwitches failed") diff --git a/pkg/metric/aliyun.go b/pkg/metric/aliyun.go index b2fec491..0535538b 100644 --- a/pkg/metric/aliyun.go +++ b/pkg/metric/aliyun.go @@ -21,4 +21,13 @@ var ( }, []string{"url", "error"}, ) + + RateLimiterLatency = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "rate_limiter_latency", + Help: "rate_limiter_latency in ms", + Buckets: []float64{200, 400, 800, 1600, 3200, 6400, 12800, 13800, 14800, 16800, 20800, 28800, 44800}, + }, + []string{"api"}, + ) ) diff --git a/types/controlplane/config_default.go b/types/controlplane/config_default.go index 01643891..8e9b88a7 100644 --- a/types/controlplane/config_default.go +++ b/types/controlplane/config_default.go @@ -60,11 +60,6 @@ type Config struct { KubeClientQPS float32 `json:"kubeClientQPS" validate:"gt=0,lte=10000" mod:"default=20"` KubeClientBurst int `json:"kubeClientBurst" validate:"gt=0,lte=10000" mod:"default=30"` - ReadOnlyQPS float32 `json:"readOnlyQPS" validate:"gt=0,lte=10000" mod:"default=8"` - ReadOnlyBurst int `json:"readOnlyBurst" validate:"gt=0,lte=10000" mod:"default=10"` - MutatingQPS float32 `json:"mutatingQPS" validate:"gt=0,lte=10000" mod:"default=4"` - MutatingBurst int `json:"mutatingBurst" validate:"gt=0,lte=10000" mod:"default=5"` - VSwitchPoolSize int `json:"vSwitchPoolSize" validate:"gt=0" mod:"default=1000"` VSwitchCacheTTL string `json:"vSwitchCacheTTL" mod:"default=20m0s"` @@ -73,6 +68,8 @@ type Config struct { BackoffOverride map[string]wait.Backoff `json:"backoffOverride,omitempty"` IPAMType string `json:"ipamType"` + RateLimit map[string]int `json:"rateLimit"` + Credential } diff --git a/types/daemon/config.go b/types/daemon/config.go index 3b307aea..affdab4c 100644 --- a/types/daemon/config.go +++ b/types/daemon/config.go @@ -58,6 +58,7 @@ type Config struct { KubeClientQPS float32 `json:"kube_client_qps"` KubeClientBurst int `json:"kube_client_burst"` ResourceGroupID string `json:"resource_group_id"` + RateLimit map[string]int `json:"rate_limit"` } func (c *Config) GetSecurityGroups() []string {