Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split Sync statuses to smaller objects in etcd #542

Merged
merged 11 commits into from
Mar 16, 2022
66 changes: 26 additions & 40 deletions pkg/api/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (

"github.com/megaease/easegress/pkg/object/httppipeline"
"github.com/megaease/easegress/pkg/object/httpserver"
"github.com/megaease/easegress/pkg/object/rawconfigtrafficcontroller"
"github.com/megaease/easegress/pkg/object/trafficcontroller"
"github.com/megaease/easegress/pkg/supervisor"
)
Expand Down Expand Up @@ -169,52 +168,39 @@ func (s *Server) _listStatusObjects() map[string]map[string]interface{} {
return status
}

func getSubStatusFromTrafficControllerStatus(status *trafficcontroller.Status, spec *supervisor.Spec) string {
for _, ns := range status.Specs {
if ns.Namespace != rawconfigtrafficcontroller.DefaultNamespace {
continue
}
if spec.Kind() == httpserver.Kind {
if val, ok := ns.HTTPServers[spec.Name()]; ok {
b, err := yaml.Marshal(val.Status)
if err != nil {
ClusterPanic(fmt.Errorf("unmarshal %v to yaml failed: %v", val.Status, err))
}
return string(b)
}
return ""
} else if spec.Kind() == httppipeline.Kind {
if val, ok := ns.HTTPPipelines[spec.Name()]; ok {
b, err := yaml.Marshal(val.Status)
if err != nil {
ClusterPanic(fmt.Errorf("unmarshal %v to yaml failed: %v", val.Status, err))
}
return string(b)
}
return ""
}
}
return ""
}

func (s *Server) _getStatusObjectFromTrafficController(name string, spec *supervisor.Spec) map[string]string {
prefix := s.cluster.Layout().StatusObjectPrefix(trafficcontroller.Kind)
key := s.cluster.Layout().StatusObjectName(trafficcontroller.Kind, name)
prefix := s.cluster.Layout().StatusObjectPrefix(key)
kvs, err := s.cluster.GetPrefix(prefix)
if err != nil {
ClusterPanic(err)
}
status := &trafficcontroller.Status{}
ans := make(map[string]string)
for k, v := range kvs {
// different member
memberName := strings.TrimPrefix(k, prefix)

err = yaml.Unmarshal([]byte(v), status)
if err != nil {
ClusterPanic(fmt.Errorf("unmarshal %s to yaml failed: %v", v, err))
ans := make(map[string]string)
for _, v := range kvs {
if spec.Kind() == httpserver.Kind {
status := &trafficcontroller.HTTPServerStatus{}
err = yaml.Unmarshal([]byte(v), status)
if err != nil {
ClusterPanic(fmt.Errorf("unmarshal %s to yaml failed: %v", v, err))
}
b, err := yaml.Marshal(status.Status)
if err != nil {
ClusterPanic(fmt.Errorf("unmarshal %v to yaml failed: %v", status.Status, err))
}
ans[key] = string(b)
} else if spec.Kind() == httppipeline.Kind {
status := &trafficcontroller.HTTPPipelineStatus{}
err = yaml.Unmarshal([]byte(v), status)
if err != nil {
ClusterPanic(fmt.Errorf("unmarshal %s to yaml failed: %v", v, err))
}
b, err := yaml.Marshal(status.Status)
if err != nil {
ClusterPanic(fmt.Errorf("unmarshal %v to yaml failed: %v", status.Status, err))
}
ans[key] = string(b)
}
nsStatus := getSubStatusFromTrafficControllerStatus(status, spec)
ans[memberName] = nsStatus
}
return ans
}
1 change: 0 additions & 1 deletion pkg/api/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,6 @@ func (s *Server) updateObject(w http.ResponseWriter, r *http.Request) {

func (s *Server) listObjects(w http.ResponseWriter, r *http.Request) {
// No need to lock.

specs := specList(s._listObjects())
// NOTE: Keep it consistent.
sort.Sort(specs)
Expand Down
6 changes: 6 additions & 0 deletions pkg/cluster/layout.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const (
statusMemberPrefix = "/status/members/"
statusMemberFormat = "/status/members/%s" // +memberName
statusObjectPrefix = "/status/objects/"
statusObjectNameFormat = "%s-%s"
statusObjectPrefixFormat = "/status/objects/%s/" // +objectName
statusObjectFormat = "/status/objects/%s/%s" // +objectName +memberName
configObjectPrefix = "/config/objects/"
Expand Down Expand Up @@ -101,6 +102,11 @@ func (l *Layout) StatusObjectPrefix(name string) string {
return fmt.Sprintf(statusObjectPrefixFormat, name)
}

// StatusObjectName returns the name of the status object.
func (l *Layout) StatusObjectName(kind string, specName string) string {
return fmt.Sprintf(statusObjectNameFormat, kind, specName)
}

// StatusObjectKey returns the key of object status.
func (l *Layout) StatusObjectKey(name string) string {
return fmt.Sprintf(statusObjectFormat, name, l.memberName)
Expand Down
101 changes: 82 additions & 19 deletions pkg/object/statussynccontroller/statussynccontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ import (
"github.com/megaease/easegress/pkg/logger"
"github.com/megaease/easegress/pkg/supervisor"
"github.com/megaease/easegress/pkg/util/timetool"

"github.com/megaease/easegress/pkg/object/rawconfigtrafficcontroller"
"github.com/megaease/easegress/pkg/object/trafficcontroller"
)

const (
Expand All @@ -51,6 +54,9 @@ type (
statusesRecords []*StatusesRecord
StatusesRecordsMutex sync.RWMutex

// statusUpdateMaxBatchSize is maximum statuses to update in one cluster transaction
statusUpdateMaxBatchSize int

done chan struct{}
}

Expand Down Expand Up @@ -125,6 +131,13 @@ func (ssc *StatusSyncController) reload() {
ssc.timer = timetool.NewDistributedTimer(nextSyncStatusDuration)
ssc.done = make(chan struct{})

opts := ssc.superSpec.Super().Options()
ssc.statusUpdateMaxBatchSize = opts.StatusUpdateMaxBatchSize
if ssc.statusUpdateMaxBatchSize < 1 {
ssc.statusUpdateMaxBatchSize = 20
}
logger.Infof("StatusUpdateMaxBatchSize is %d", ssc.statusUpdateMaxBatchSize)

go ssc.run()
}

Expand Down Expand Up @@ -152,6 +165,34 @@ func (ssc *StatusSyncController) Close() {
ssc.timer.Close()
}

func safeMarshal(value *supervisor.Status) (string, bool) {
buff, err := marshalStatus(value)
if err != nil {
logger.Errorf("BUG: marshal %#v to yaml failed: %v",
value, err)
return "", false
}
return string(buff), true
}

func (ssc *StatusSyncController) splitRawconfigTrafficControllerStatus(
kind string,
status *trafficcontroller.StatusInSameNamespace,
statuses map[string]string,
statusesRecord *StatusesRecord) bool {
for key, value := range status.ToSyncStatus() {
name := ssc.superSpec.Super().Cluster().Layout().StatusObjectName(kind, key)
statusesRecord.Statuses[name] = value

marshalledValue, ok := safeMarshal(value)
if !ok {
return false
}
statuses[name] = marshalledValue
}
return true
}

func (ssc *StatusSyncController) handleStatus(unixTimestamp int64) {
statuses := make(map[string]string)
statusesRecord := &StatusesRecord{
Expand All @@ -172,16 +213,24 @@ func (ssc *StatusSyncController) handleStatus(unixTimestamp int64) {
status := entity.Instance().Status()
status.Timestamp = unixTimestamp

statusesRecord.Statuses[name] = status

buff, err := marshalStatus(status)
if err != nil {
logger.Errorf("BUG: marshal %#v to yaml failed: %v",
status, err)
return false
if trafficStatus, ok := status.ObjectStatus.(*trafficcontroller.Status); ok {
statusInNamespaces := trafficStatus.Specs
for _, statInNS := range statusInNamespaces {
if !ssc.splitRawconfigTrafficControllerStatus(name, statInNS, statuses, statusesRecord) {
return false
}
}
return true
} else if rawTrafficStatus, ok := status.ObjectStatus.(*rawconfigtrafficcontroller.Status); ok {
return ssc.splitRawconfigTrafficControllerStatus(name, rawTrafficStatus, statuses, statusesRecord)
} else {
statusesRecord.Statuses[name] = status
marshalledValue, ok := safeMarshal(status)
if !ok {
return false
}
statuses[name] = marshalledValue
}
statuses[name] = string(buff)

return true
}

Expand All @@ -192,29 +241,43 @@ func (ssc *StatusSyncController) handleStatus(unixTimestamp int64) {
}

func (ssc *StatusSyncController) syncStatusToCluster(statuses map[string]string) {
kvs := make(map[string]*string)

// Delete statuses which disappeared in current status.
if ssc.lastSyncStatuses != nil {
kvs := make(map[string]*string)
for k := range ssc.lastSyncStatuses {
if _, exists := statuses[k]; !exists {
k = ssc.superSpec.Super().Cluster().Layout().StatusObjectKey(k)
kvs[k] = nil
}
}
err := ssc.superSpec.Super().Cluster().PutAndDeleteUnderLease(kvs)
if err != nil {
logger.Errorf("sync status failed. If the message size is too large, "+
"please increase the value of cluster.MaxCallSendMsgSize in configuration: %v", err)
}
}

ssc.lastSyncStatuses = statuses

for k, v := range statuses {
k = ssc.superSpec.Super().Cluster().Layout().StatusObjectKey(k)
kvs[k] = &v
kvs := make(map[string]*string)
for k, value := range statuses {
samutamm marked this conversation as resolved.
Show resolved Hide resolved
key := ssc.superSpec.Super().Cluster().Layout().StatusObjectKey(k)
kvs[key] = &value
if len(kvs) >= ssc.statusUpdateMaxBatchSize {
err := ssc.superSpec.Super().Cluster().PutAndDeleteUnderLease(kvs)
if err != nil {
logger.Errorf("sync status failed. If the message size is too large, "+
"please increase the value of cluster.MaxCallSendMsgSize in configuration: %v", err)
}
kvs = make(map[string]*string)
}
}

err := ssc.superSpec.Super().Cluster().PutAndDeleteUnderLease(kvs)
if err != nil {
logger.Errorf("sync status failed. If the message size is too large, "+
"please increase the value of cluster.MaxCallSendMsgSize in configuration: %v", err)
if len(kvs) > 0 {
err := ssc.superSpec.Super().Cluster().PutAndDeleteUnderLease(kvs)
if err != nil {
logger.Errorf("sync status failed. If the message size is too large, "+
"please increase the value of cluster.MaxCallSendMsgSize in configuration: %v", err)
}
}
}

Expand Down
20 changes: 20 additions & 0 deletions pkg/object/trafficcontroller/trafficcontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,26 @@ func (ns *Namespace) GetHandler(name string) (protocol.HTTPHandler, bool) {
return handler, true
}

func (hss *HTTPServerStatus) toSyncStatus() *supervisor.Status {
return &supervisor.Status{ObjectStatus: hss}
}

func (hps *HTTPPipelineStatus) toSyncStatus() *supervisor.Status {
return &supervisor.Status{ObjectStatus: hps}
}

// ToSyncStatus returns http servers and pipelines in a map
func (sisn *StatusInSameNamespace) ToSyncStatus() map[string]*supervisor.Status {
objects := make(map[string]*supervisor.Status)
for key, server := range sisn.HTTPServers {
objects[key] = server.toSyncStatus()
}
for key, server := range sisn.HTTPPipelines {
objects[key] = server.toSyncStatus()
}
return objects
}

// Category returns the category of TrafficController.
func (tc *TrafficController) Category() supervisor.ObjectCategory {
return Category
Expand Down
7 changes: 6 additions & 1 deletion pkg/option/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
"github.com/megaease/easegress/pkg/version"
)

// ClusterOptions is the start-up options.
// ClusterOptions defines the cluster members.
type ClusterOptions struct {
// Primary members define following URLs to form a cluster.
ListenPeerURLs []string `yaml:"listen-peer-urls"`
Expand Down Expand Up @@ -95,6 +95,9 @@ type Options struct {
CPUProfileFile string `yaml:"cpu-profile-file"`
MemoryProfileFile string `yaml:"memory-profile-file"`

// Status
StatusUpdateMaxBatchSize int `yaml:"status-update-max-batch-size"`

// Prepare the items below in advance.
AbsHomeDir string `yaml:"-"`
AbsDataDir string `yaml:"-"`
Expand Down Expand Up @@ -161,6 +164,8 @@ func New() *Options {
opt.flags.StringVar(&opt.CPUProfileFile, "cpu-profile-file", "", "Path to the CPU profile file.")
opt.flags.StringVar(&opt.MemoryProfileFile, "memory-profile-file", "", "Path to the memory profile file.")

opt.flags.IntVar(&opt.StatusUpdateMaxBatchSize, "status-update-max-batch-size", 20, "Number of object statuses to update at maximum in one transaction.")

opt.viper.BindPFlags(opt.flags)

return opt
Expand Down