Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix:合并main分支部分feature到release-v1.15.0 #1034

Merged
merged 3 commits into from
Mar 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions apiserver/eurekaserver/access.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ func checkRegisterRequest(registrationRequest *RegistrationRequest, req *restful
writeHeader(http.StatusBadRequest, rsp)
return false
}
if len(registrationRequest.Instance.InstanceId) == 0 {
if len(registrationRequest.Instance.InstanceId) == 0 && len(registrationRequest.Instance.HostName) == 0 {
log.Errorf("[EUREKA-SERVER] fail to parse register request, uri: %s, client: %s, err: %s",
req.Request.RequestURI, remoteAddr, "instance id required")
writePolarisStatusCode(req, api.InvalidInstanceID)
Expand Down Expand Up @@ -362,9 +362,7 @@ func (h *EurekaServer) RegisterApplication(req *restful.Request, rsp *restful.Re
writeHeader(http.StatusBadRequest, rsp)
return
}
if len(registrationRequest.Instance.InstanceId) == 0 {
registrationRequest.Instance.InstanceId = registrationRequest.Instance.HostName
}

if !checkRegisterRequest(registrationRequest, req, rsp) {
return
}
Expand Down Expand Up @@ -607,7 +605,7 @@ func (h *EurekaServer) UpdateMetadata(req *restful.Request, rsp *restful.Respons
}
metadataMap[key] = values[0]
}
code := h.updateMetadata(context.Background(), instId, metadataMap)
code := h.updateMetadata(context.Background(), appId, instId, metadataMap)
writePolarisStatusCode(req, code)
if code == api.ExecuteSuccess {
log.Infof("[EUREKA-SERVER]instance metadata (instId=%s, appId=%s) has been updated successfully",
Expand Down
7 changes: 4 additions & 3 deletions apiserver/eurekaserver/applications.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,6 @@ func parseLeaseInfo(leaseInfo *LeaseInfo, instance *apiservice.Instance) {
}

func buildInstance(appName string, instance *apiservice.Instance, lastModifyTime int64) *InstanceInfo {
eurekaInstanceId := instance.GetId().GetValue()
instanceInfo := &InstanceInfo{
CountryId: DefaultCountryIdInt,
Port: &PortWrapper{
Expand All @@ -307,12 +306,14 @@ func buildInstance(appName string, instance *apiservice.Instance, lastModifyTime
}
instanceInfo.AppName = appName
// 属于eureka注册的实例
instanceInfo.InstanceId = eurekaInstanceId
instanceInfo.InstanceId = instance.GetId().GetValue()
metadata := instance.GetMetadata()
if metadata == nil {
metadata = map[string]string{}
}
instanceInfo.AppName = appName
if eurekaInstanceId, ok := metadata[MetadataInstanceId]; ok {
instanceInfo.InstanceId = eurekaInstanceId
}
if hostName, ok := metadata[MetadataHostName]; ok {
instanceInfo.HostName = hostName
}
Expand Down
1 change: 1 addition & 0 deletions apiserver/eurekaserver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ const (
optionEnableSelfPreservation = "enableSelfPreservation"
optionPeerNodesToReplicate = "peersToReplicate"
optionCustomValues = "customValues"
optionGenerateUniqueInstId = "generateUniqueInstId"
)

const (
Expand Down
23 changes: 15 additions & 8 deletions apiserver/eurekaserver/replicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,12 +228,18 @@ func (h *EurekaServer) handleInstanceEvent(ctx context.Context, i interface{}) e
}
appName := formatReadName(e.Service)
curTimeMilli := time.Now().UnixMilli()
eurekaInstanceId := e.Id
if e.Instance.Metadata != nil {
if _, ok := e.Instance.Metadata[MetadataInstanceId]; ok {
eurekaInstanceId = e.Instance.Metadata[MetadataInstanceId]
}
}
switch e.EType {
case model.EventInstanceOnline, model.EventInstanceUpdate:
instanceInfo := eventToInstance(&e, appName, curTimeMilli)
h.replicateWorker.AddReplicateTask(&ReplicationInstance{
AppName: appName,
Id: e.Id,
Id: eurekaInstanceId,
LastDirtyTimestamp: curTimeMilli,
Status: StatusUp,
InstanceInfo: instanceInfo,
Expand All @@ -242,14 +248,14 @@ func (h *EurekaServer) handleInstanceEvent(ctx context.Context, i interface{}) e
case model.EventInstanceOffline:
h.replicateWorker.AddReplicateTask(&ReplicationInstance{
AppName: appName,
Id: e.Id,
Id: eurekaInstanceId,
Action: actionCancel,
})
case model.EventInstanceSendHeartbeat:
instanceInfo := eventToInstance(&e, appName, curTimeMilli)
rInstance := &ReplicationInstance{
AppName: appName,
Id: e.Id,
Id: eurekaInstanceId,
Status: StatusUp,
InstanceInfo: instanceInfo,
Action: actionHeartbeat,
Expand All @@ -261,17 +267,18 @@ func (h *EurekaServer) handleInstanceEvent(ctx context.Context, i interface{}) e
case model.EventInstanceOpenIsolate:
h.replicateWorker.AddReplicateTask(&ReplicationInstance{
AppName: appName,
Id: e.Id,
Id: eurekaInstanceId,
LastDirtyTimestamp: curTimeMilli,
OverriddenStatus: StatusOutOfService,
Action: actionHeartbeat,
Status: StatusOutOfService,
Action: actionStatusUpdate,
})
case model.EventInstanceCloseIsolate:
h.replicateWorker.AddReplicateTask(&ReplicationInstance{
AppName: appName,
Id: e.Id,
Id: eurekaInstanceId,
LastDirtyTimestamp: curTimeMilli,
Action: actionDeleteStatusOverride,
Status: StatusUp,
Action: actionStatusUpdate,
})

}
Expand Down
10 changes: 9 additions & 1 deletion apiserver/eurekaserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ const (
MetadataSecurePort = "internal-eureka-secure-port"
MetadataSecurePortEnabled = "internal-eureka-secure-port-enabled"
MetadataReplicate = "internal-eureka-replicate"
MetadataInstanceId = "internal-eureka-instance-id"

ServerEureka = "eureka"

Expand Down Expand Up @@ -143,7 +144,8 @@ type EurekaServer struct {
replicateWorker *ReplicateWorker
eventHandlerHandler *EurekaInstanceEventHandler

replicatePeers []string
replicatePeers []string
generateUniqueInstId bool
}

// GetPort 获取端口
Expand Down Expand Up @@ -247,6 +249,12 @@ func (h *EurekaServer) Initialize(ctx context.Context, option map[string]interfa
}
h.enableSelfPreservation = enableSelfPreservation

if value, ok := option[optionGenerateUniqueInstId]; ok {
h.generateUniqueInstId, _ = value.(bool)
} else {
h.generateUniqueInstId = false
}

if raw, _ := option[optionCustomValues].(map[interface{}]interface{}); raw != nil {
for k, v := range raw {
CustomEurekaParameters[k.(string)] = fmt.Sprintf("%v", v)
Expand Down
38 changes: 32 additions & 6 deletions apiserver/eurekaserver/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"
"math"
"strconv"
"strings"

"github.com/golang/protobuf/ptypes/wrappers"
apimodel "github.com/polarismesh/specification/source/go/api/v1/model"
Expand All @@ -31,11 +32,29 @@ import (
"github.com/polarismesh/polaris/common/utils"
)

func buildBaseInstance(instance *InstanceInfo, namespace string, appId string) *apiservice.Instance {
func checkOrBuildNewInstanceId(appId string, instId string, generateUniqueInstId bool) string {
if !generateUniqueInstId {
return instId
}
lowerAppId := strings.ToLower(appId)
lowerInstIdId := strings.ToLower(instId)
if strings.Contains(lowerInstIdId, lowerAppId) {
return instId
}
return lowerAppId + ":" + lowerInstIdId
}

func buildBaseInstance(
instance *InstanceInfo, namespace string, appId string, generateUniqueInstId bool) *apiservice.Instance {
targetInstance := &apiservice.Instance{}
eurekaMetadata := make(map[string]string)

eurekaMetadata[MetadataRegisterFrom] = ServerEureka
eurekaInstanceId := instance.InstanceId
if len(eurekaInstanceId) == 0 {
eurekaInstanceId = instance.HostName
}
eurekaMetadata[MetadataInstanceId] = eurekaInstanceId
if len(instance.AppGroupName) > 0 {
eurekaMetadata[MetadataAppGroupName] = instance.AppGroupName
}
Expand Down Expand Up @@ -69,7 +88,8 @@ func buildBaseInstance(instance *InstanceInfo, namespace string, appId string) *
if len(instance.SecureVipAddress) > 0 {
eurekaMetadata[MetadataSecureVipAddress] = instance.SecureVipAddress
}
targetInstance.Id = &wrappers.StringValue{Value: instance.InstanceId}
targetInstance.Id = &wrappers.StringValue{
Value: checkOrBuildNewInstanceId(appId, eurekaInstanceId, generateUniqueInstId)}
targetInstance.Metadata = eurekaMetadata
targetInstance.Service = &wrappers.StringValue{Value: appId}
targetInstance.Namespace = &wrappers.StringValue{Value: namespace}
Expand Down Expand Up @@ -137,7 +157,8 @@ func buildStatus(instance *InstanceInfo, targetInstance *apiservice.Instance) {
}
}

func convertEurekaInstance(instance *InstanceInfo, namespace string, appId string) *apiservice.Instance {
func convertEurekaInstance(
instance *InstanceInfo, namespace string, appId string, generateUniqueInstId bool) *apiservice.Instance {
var secureEnable bool
var securePort int
var insecureEnable bool
Expand All @@ -160,7 +181,7 @@ func convertEurekaInstance(instance *InstanceInfo, namespace string, appId strin
insecurePort = DefaultInsecurePort
}

targetInstance := buildBaseInstance(instance, namespace, appId)
targetInstance := buildBaseInstance(instance, namespace, appId, generateUniqueInstId)

// 同时打开2个端口,通过medata保存http端口
targetInstance.Protocol = &wrappers.StringValue{Value: InsecureProtocol}
Expand All @@ -179,7 +200,7 @@ func (h *EurekaServer) registerInstances(
ctx = context.WithValue(ctx, utils.ContextOpenAsyncRegis, true)
appId = formatWriteName(appId)
// 1. 先转换数据结构
totalInstance := convertEurekaInstance(instance, h.namespace, appId)
totalInstance := convertEurekaInstance(instance, h.namespace, appId, h.generateUniqueInstId)
if replicated {
totalInstance.Metadata[model.MetadataReplicated] = "true"
}
Expand Down Expand Up @@ -211,6 +232,7 @@ func (h *EurekaServer) deregisterInstance(
ctx = context.WithValue(
ctx, model.CtxEventKeyMetadata, map[string]string{MetadataReplicate: strconv.FormatBool(replicated)})
ctx = context.WithValue(ctx, utils.ContextOpenAsyncRegis, true)
instanceId = checkOrBuildNewInstanceId(appId, instanceId, h.generateUniqueInstId)
resp := h.namingServer.DeregisterInstance(ctx, &apiservice.Instance{Id: &wrappers.StringValue{Value: instanceId}})
return resp.GetCode().GetValue()
}
Expand All @@ -223,6 +245,7 @@ func (h *EurekaServer) updateStatus(
}
ctx = context.WithValue(
ctx, model.CtxEventKeyMetadata, map[string]string{MetadataReplicate: strconv.FormatBool(replicated)})
instanceId = checkOrBuildNewInstanceId(appId, instanceId, h.generateUniqueInstId)
resp := h.namingServer.UpdateInstance(ctx, &apiservice.Instance{
Id: &wrappers.StringValue{Value: instanceId}, Isolate: &wrappers.BoolValue{Value: isolated}})
return resp.GetCode().GetValue()
Expand All @@ -231,6 +254,7 @@ func (h *EurekaServer) updateStatus(
func (h *EurekaServer) renew(ctx context.Context, appId string, instanceId string, replicated bool) uint32 {
ctx = context.WithValue(
ctx, model.CtxEventKeyMetadata, map[string]string{MetadataReplicate: strconv.FormatBool(replicated)})
instanceId = checkOrBuildNewInstanceId(appId, instanceId, h.generateUniqueInstId)
resp := h.healthCheckServer.Report(ctx, &apiservice.Instance{Id: &wrappers.StringValue{Value: instanceId}})
code := resp.GetCode().GetValue()

Expand All @@ -242,7 +266,9 @@ func (h *EurekaServer) renew(ctx context.Context, appId string, instanceId strin
return code
}

func (h *EurekaServer) updateMetadata(ctx context.Context, instanceId string, metadata map[string]string) uint32 {
func (h *EurekaServer) updateMetadata(
ctx context.Context, appId string, instanceId string, metadata map[string]string) uint32 {
instanceId = checkOrBuildNewInstanceId(appId, instanceId, h.generateUniqueInstId)
resp := h.namingServer.UpdateInstance(ctx,
&apiservice.Instance{Id: &wrappers.StringValue{Value: instanceId}, Metadata: metadata})
return resp.GetCode().GetValue()
Expand Down
2 changes: 1 addition & 1 deletion common/redispool/redis_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ func (p *Pool) afterHandleTask(startTime time.Time, command string, task *Task,
}
}
p.statis.ReportCallMetrics(metrics.CallMetric{
Type: metrics.ServerCallMetric,
Type: metrics.RedisCallMetric,
API: command,
Code: int(code),
Duration: costDuration,
Expand Down
1 change: 1 addition & 0 deletions release/conf/polaris-server.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ apiservers:
refreshInterval: 10
deltaExpireInterval: 60
unhealthyExpireInterval: 180
generateUniqueInstId: false
connLimit:
openConnLimit: false
maxConnPerHost: 1024
Expand Down