Skip to content

Commit

Permalink
feat:envoy ratelimit action suppoer all spec label & add hds feature (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
chuntaojun authored Sep 24, 2023
1 parent e7f2bfc commit d60fc05
Show file tree
Hide file tree
Showing 54 changed files with 1,660 additions and 660 deletions.
45 changes: 0 additions & 45 deletions apiserver/grpcserver/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,13 @@ import (
"net"
"net/http"
"runtime"
"strings"
"time"

apimodel "github.com/polarismesh/specification/source/go/api/v1/model"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/status"

api "github.com/polarismesh/polaris/common/api/v1"
Expand Down Expand Up @@ -438,48 +435,6 @@ func (b *BaseGrpcServer) AllowAccess(method string) bool {
return ok
}

// ConvertContext 将GRPC上下文转换成内部上下文
func ConvertContext(ctx context.Context) context.Context {
var (
requestID = ""
userAgent = ""
)
meta, exist := metadata.FromIncomingContext(ctx)
if exist {
ids := meta["request-id"]
if len(ids) > 0 {
requestID = ids[0]
}
agents := meta["user-agent"]
if len(agents) > 0 {
userAgent = agents[0]
}
} else {
meta = metadata.MD{}
}

var (
clientIP = ""
address = ""
)
if pr, ok := peer.FromContext(ctx); ok && pr.Addr != nil {
address = pr.Addr.String()
addrSlice := strings.Split(address, ":")
if len(addrSlice) == 2 {
clientIP = addrSlice[0]
}
}

ctx = context.Background()
ctx = context.WithValue(ctx, utils.ContextGrpcHeader, meta)
ctx = context.WithValue(ctx, utils.StringContext("request-id"), requestID)
ctx = context.WithValue(ctx, utils.StringContext("client-ip"), clientIP)
ctx = context.WithValue(ctx, utils.ContextClientAddress, address)
ctx = context.WithValue(ctx, utils.StringContext("user-agent"), userAgent)

return ctx
}

type connCounterHook struct {
bz model.BzModule
}
Expand Down
2 changes: 1 addition & 1 deletion apiserver/grpcserver/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func TestConvertContext(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := ConvertContext(tt.args.ctx); !reflect.DeepEqual(got, tt.want) {
if got := utils.ConvertGRPCContext(tt.args.ctx); !reflect.DeepEqual(got, tt.want) {
t.Errorf("ConvertContext() = %v, \n want %v", got, tt.want)
}
})
Expand Down
13 changes: 6 additions & 7 deletions apiserver/grpcserver/config/client_access.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (

apiconfig "github.com/polarismesh/specification/source/go/api/v1/config_manage"

"github.com/polarismesh/polaris/apiserver/grpcserver"
"github.com/polarismesh/polaris/common/metrics"
commontime "github.com/polarismesh/polaris/common/time"
"github.com/polarismesh/polaris/common/utils"
Expand All @@ -33,7 +32,7 @@ import (
// GetConfigFile 拉取配置
func (g *ConfigGRPCServer) GetConfigFile(ctx context.Context,
req *apiconfig.ClientConfigFileInfo) (*apiconfig.ConfigClientResponse, error) {
ctx = grpcserver.ConvertContext(ctx)
ctx = utils.ConvertGRPCContext(ctx)

startTime := commontime.CurrentMillisecond()
defer func() {
Expand All @@ -53,31 +52,31 @@ func (g *ConfigGRPCServer) GetConfigFile(ctx context.Context,
// CreateConfigFile 创建或更新配置
func (g *ConfigGRPCServer) CreateConfigFile(ctx context.Context,
configFile *apiconfig.ConfigFile) (*apiconfig.ConfigClientResponse, error) {
ctx = grpcserver.ConvertContext(ctx)
ctx = utils.ConvertGRPCContext(ctx)
response := g.configServer.CreateConfigFileFromClient(ctx, configFile)
return response, nil
}

// UpdateConfigFile 创建或更新配置
func (g *ConfigGRPCServer) UpdateConfigFile(ctx context.Context,
configFile *apiconfig.ConfigFile) (*apiconfig.ConfigClientResponse, error) {
ctx = grpcserver.ConvertContext(ctx)
ctx = utils.ConvertGRPCContext(ctx)
response := g.configServer.UpdateConfigFileFromClient(ctx, configFile)
return response, nil
}

// PublishConfigFile 发布配置
func (g *ConfigGRPCServer) PublishConfigFile(ctx context.Context,
configFile *apiconfig.ConfigFileRelease) (*apiconfig.ConfigClientResponse, error) {
ctx = grpcserver.ConvertContext(ctx)
ctx = utils.ConvertGRPCContext(ctx)
response := g.configServer.PublishConfigFileFromClient(ctx, configFile)
return response, nil
}

// WatchConfigFiles 订阅配置变更
func (g *ConfigGRPCServer) WatchConfigFiles(ctx context.Context,
request *apiconfig.ClientWatchConfigFileRequest) (*apiconfig.ConfigClientResponse, error) {
ctx = grpcserver.ConvertContext(ctx)
ctx = utils.ConvertGRPCContext(ctx)

// 阻塞等待响应
callback, err := g.configServer.WatchConfigFiles(ctx, request)
Expand All @@ -102,6 +101,6 @@ func (g *ConfigGRPCServer) GetConfigFileMetadataList(ctx context.Context,
})
}()

ctx = grpcserver.ConvertContext(ctx)
ctx = utils.ConvertGRPCContext(ctx)
return g.configServer.GetConfigFileNamesWithCache(ctx, req), nil
}
13 changes: 6 additions & 7 deletions apiserver/grpcserver/discover/v1/client_access.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"

"github.com/polarismesh/polaris/apiserver/grpcserver"
api "github.com/polarismesh/polaris/common/api/v1"
commonlog "github.com/polarismesh/polaris/common/log"
"github.com/polarismesh/polaris/common/metrics"
Expand All @@ -45,13 +44,13 @@ var (

// ReportClient 客户端上报
func (g *DiscoverServer) ReportClient(ctx context.Context, in *apiservice.Client) (*apiservice.Response, error) {
return g.namingServer.ReportClient(grpcserver.ConvertContext(ctx), in), nil
return g.namingServer.ReportClient(utils.ConvertGRPCContext(ctx), in), nil
}

// RegisterInstance 注册服务实例
func (g *DiscoverServer) RegisterInstance(ctx context.Context, in *apiservice.Instance) (*apiservice.Response, error) {
// 需要记录操作来源,提高效率,只针对特殊接口添加operator
rCtx := grpcserver.ConvertContext(ctx)
rCtx := utils.ConvertGRPCContext(ctx)
rCtx = context.WithValue(rCtx, utils.StringContext("operator"), ParseGrpcOperator(ctx))

// 客户端请求中带了 token 的,优先已请求中的为准
Expand All @@ -73,7 +72,7 @@ func (g *DiscoverServer) RegisterInstance(ctx context.Context, in *apiservice.In
func (g *DiscoverServer) DeregisterInstance(
ctx context.Context, in *apiservice.Instance) (*apiservice.Response, error) {
// 需要记录操作来源,提高效率,只针对特殊接口添加operator
rCtx := grpcserver.ConvertContext(ctx)
rCtx := utils.ConvertGRPCContext(ctx)
rCtx = context.WithValue(rCtx, utils.StringContext("operator"), ParseGrpcOperator(ctx))

// 客户端请求中带了 token 的,优先已请求中的为准
Expand All @@ -87,7 +86,7 @@ func (g *DiscoverServer) DeregisterInstance(

// Discover 统一发现接口
func (g *DiscoverServer) Discover(server apiservice.PolarisGRPC_DiscoverServer) error {
ctx := grpcserver.ConvertContext(server.Context())
ctx := utils.ConvertGRPCContext(server.Context())
clientIP, _ := ctx.Value(utils.StringContext("client-ip")).(string)
clientAddress, _ := ctx.Value(utils.StringContext("client-address")).(string)
requestID, _ := ctx.Value(utils.StringContext("request-id")).(string)
Expand Down Expand Up @@ -167,12 +166,12 @@ func (g *DiscoverServer) Discover(server apiservice.PolarisGRPC_DiscoverServer)

// Heartbeat 上报心跳
func (g *DiscoverServer) Heartbeat(ctx context.Context, in *apiservice.Instance) (*apiservice.Response, error) {
return g.healthCheckServer.Report(grpcserver.ConvertContext(ctx), in), nil
return g.healthCheckServer.Report(utils.ConvertGRPCContext(ctx), in), nil
}

// BatchHeartbeat 批量上报心跳
func (g *DiscoverServer) BatchHeartbeat(svr apiservice.PolarisHeartbeatGRPC_BatchHeartbeatServer) error {
ctx := grpcserver.ConvertContext(svr.Context())
ctx := utils.ConvertGRPCContext(svr.Context())

for {
req, err := svr.Recv()
Expand Down
Loading

0 comments on commit d60fc05

Please sign in to comment.