Skip to content

Commit

Permalink
add maintain job (#1029)
Browse files Browse the repository at this point in the history
  • Loading branch information
shichaoyuan authored Mar 21, 2023
1 parent a4b9de6 commit 2f2fc69
Show file tree
Hide file tree
Showing 24 changed files with 915 additions and 56 deletions.
2 changes: 2 additions & 0 deletions bootstrap/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/polarismesh/polaris/cache"
"github.com/polarismesh/polaris/common/log"
"github.com/polarismesh/polaris/config"
"github.com/polarismesh/polaris/maintain"
"github.com/polarismesh/polaris/namespace"
"github.com/polarismesh/polaris/plugin"
"github.com/polarismesh/polaris/service"
Expand All @@ -46,6 +47,7 @@ type Config struct {
Naming service.Config `yaml:"naming"`
Config config.Config `yaml:"config"`
HealthChecks healthcheck.Config `yaml:"healthcheck"`
Maintain maintain.Config `yaml:"maintain"`
Store store.Config `yaml:"store"`
Auth auth.Config `yaml:"auth"`
Plugin plugin.Config `yaml:"plugin"`
Expand Down
4 changes: 2 additions & 2 deletions bootstrap/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func StartComponents(ctx context.Context, cfg *boot_config.Config) error {
return err
}

namingSvr, err := service.GetOriginServer()
namingSvr, err := service.GetServer()
if err != nil {
return err
}
Expand All @@ -197,7 +197,7 @@ func StartComponents(ctx context.Context, cfg *boot_config.Config) error {
}

// 初始化运维操作模块
if err := maintain.Initialize(ctx, namingSvr, healthCheckServer, s); err != nil {
if err := maintain.Initialize(ctx, &cfg.Maintain, namingSvr, healthCheckServer, cacheMgn, s); err != nil {
return err
}

Expand Down
5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ require (

require github.com/polarismesh/specification v1.2.0

require github.com/DATA-DOG/go-sqlmock v1.5.0
require (
github.com/DATA-DOG/go-sqlmock v1.5.0
github.com/robfig/cron/v3 v3.0.1
)

replace gopkg.in/yaml.v2 => gopkg.in/yaml.v2 v2.2.2
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,8 @@ github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4O
github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/prometheus/procfs v0.7.3 h1:4jVXhlkAyzOScmCkXBTOLRLTz8EeU+eyjrwB/EPq0VU=
github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
Expand Down
25 changes: 25 additions & 0 deletions maintain/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/**
* Tencent is pleased to support the open source community by making Polaris available.
*
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package maintain

import "github.com/polarismesh/polaris/maintain/job"

// Config maintain configuration
type Config struct {
Jobs []job.JobConfig `yaml:"jobs"`
}
18 changes: 13 additions & 5 deletions maintain/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"errors"

"github.com/polarismesh/polaris/auth"
"github.com/polarismesh/polaris/cache"
"github.com/polarismesh/polaris/maintain/job"
"github.com/polarismesh/polaris/service"
"github.com/polarismesh/polaris/service/healthcheck"
"github.com/polarismesh/polaris/store"
Expand All @@ -34,14 +36,14 @@ var (
)

// Initialize 初始化
func Initialize(ctx context.Context, namingService service.DiscoverServer, healthCheckServer *healthcheck.Server,
storage store.Store) error {
func Initialize(ctx context.Context, cfg *Config, namingService service.DiscoverServer,
healthCheckServer *healthcheck.Server, cacheMgn *cache.CacheManager, storage store.Store) error {

if finishInit {
return nil
}

err := initialize(ctx, namingService, healthCheckServer, storage)
err := initialize(ctx, cfg, namingService, healthCheckServer, cacheMgn, storage)
if err != nil {
return err
}
Expand All @@ -50,8 +52,8 @@ func Initialize(ctx context.Context, namingService service.DiscoverServer, healt
return nil
}

func initialize(_ context.Context, namingService service.DiscoverServer, healthCheckServer *healthcheck.Server,
storage store.Store) error {
func initialize(_ context.Context, cfg *Config, namingService service.DiscoverServer,
healthCheckServer *healthcheck.Server, cacheMgn *cache.CacheManager, storage store.Store) error {

authServer, err := auth.GetAuthServer()
if err != nil {
Expand All @@ -60,8 +62,14 @@ func initialize(_ context.Context, namingService service.DiscoverServer, healthC

maintainServer.namingServer = namingService
maintainServer.healthCheckServer = healthCheckServer
maintainServer.cacheMgn = cacheMgn
maintainServer.storage = storage

maintainJobs := job.NewMaintainJobs(namingService, cacheMgn, storage)
if err := maintainJobs.StartMaintianJobs(cfg.Jobs); err != nil {
return err
}

server = newServerAuthAbility(maintainServer, authServer)
return nil
}
Expand Down
50 changes: 50 additions & 0 deletions maintain/job/clean_deleted_instance.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/**
* Tencent is pleased to support the open source community by making Polaris available.
*
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package job

import (
"github.com/polarismesh/polaris/store"
)

type cleanDeletedInstancesJob struct {
storage store.Store
}

func (job *cleanDeletedInstancesJob) init(raw map[string]interface{}) error {
return nil
}

func (job *cleanDeletedInstancesJob) execute() {
batchSize := uint32(100)
for {
count, err := job.storage.BatchCleanDeletedInstances(batchSize)
if err != nil {
log.Errorf("[Maintain][Job][CleanDeletedInstances] batch clean deleted instance, err: %v", err)
break
}

log.Infof("[Maintain][Job][CleanDeletedInstances] clean deleted instance count %d", count)

if count < batchSize {
break
}
}
}

func (job *cleanDeletedInstancesJob) clear() {
}
26 changes: 26 additions & 0 deletions maintain/job/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/**
* Tencent is pleased to support the open source community by making Polaris available.
*
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package job

// JobcConfig maintain job configuration
type JobConfig struct {
Name string `yaml:"name"`
Enable bool `yaml:"enable"`
CronSpec string `yaml:"cronSpec"`
Option map[string]interface{} `yaml:"option"`
}
159 changes: 159 additions & 0 deletions maintain/job/delete_empty_service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
/**
* Tencent is pleased to support the open source community by making Polaris available.
*
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package job

import (
"time"

"github.com/mitchellh/mapstructure"
apiservice "github.com/polarismesh/specification/source/go/api/v1/service_manage"

"github.com/polarismesh/polaris/cache"
api "github.com/polarismesh/polaris/common/api/v1"
"github.com/polarismesh/polaris/common/model"
"github.com/polarismesh/polaris/common/utils"
"github.com/polarismesh/polaris/service"
"github.com/polarismesh/polaris/store"
)

type DeleteEmptyAutoCreatedServiceJobConfig struct {
ServiceDeleteTimeout time.Duration `mapstructure:"serviceDeleteTimeout"`
}

type deleteEmptyAutoCreatedServiceJob struct {
cfg *DeleteEmptyAutoCreatedServiceJobConfig
namingServer service.DiscoverServer
cacheMgn *cache.CacheManager
storage store.Store
emptyServices map[string]time.Time
}

func (job *deleteEmptyAutoCreatedServiceJob) init(raw map[string]interface{}) error {
cfg := &DeleteEmptyAutoCreatedServiceJobConfig{}
decodeConfig := &mapstructure.DecoderConfig{
DecodeHook: mapstructure.StringToTimeDurationHookFunc(),
Result: cfg,
}
decoder, err := mapstructure.NewDecoder(decodeConfig)
if err != nil {
log.Errorf("[Maintain][Job][DeleteEmptyAutoCreatedService] new config decoder err: %v", err)
return err
}
err = decoder.Decode(raw)
if err != nil {
log.Errorf("[Maintain][Job][DeleteEmptyAutoCreatedService] parse config err: %v", err)
return err
}
job.cfg = cfg
job.emptyServices = map[string]time.Time{}
return nil
}

func (job *deleteEmptyAutoCreatedServiceJob) execute() {
err := job.deleteEmptyAutoCreatedServices()
if err != nil {
log.Errorf("[Maintain][Job][DeleteEmptyAutoCreatedService] delete empty autocreated services, err: %v", err)
}
}

func (job *deleteEmptyAutoCreatedServiceJob) clear() {
job.emptyServices = map[string]time.Time{}
}

func (job *deleteEmptyAutoCreatedServiceJob) getEmptyAutoCreatedServices() []*model.Service {
services := job.getAllEmptyAutoCreatedServices()
return job.filterToDeletedServices(services, time.Now(), job.cfg.ServiceDeleteTimeout)
}

func (job *deleteEmptyAutoCreatedServiceJob) getAllEmptyAutoCreatedServices() []*model.Service {
var res []*model.Service
_ = job.cacheMgn.Service().IteratorServices(func(key string, svc *model.Service) (bool, error) {
if svc.IsAlias() {
return true, nil
}
v, ok := svc.Meta[service.MetadataInternalAutoCreated]
if !ok || v != "true" {
return true, nil
}
count := job.cacheMgn.Instance().GetInstancesCountByServiceID(svc.ID)
if count.TotalInstanceCount == 0 {
res = append(res, svc)
}
return true, nil
})
return res
}

func (job *deleteEmptyAutoCreatedServiceJob) filterToDeletedServices(services []*model.Service,
now time.Time, timeout time.Duration) []*model.Service {
var toDeleteServices []*model.Service
m := map[string]time.Time{}
for _, svc := range services {
value, ok := job.emptyServices[svc.ID]
if !ok {
m[svc.ID] = now
continue
}
if now.After(value.Add(timeout)) {
toDeleteServices = append(toDeleteServices, svc)
} else {
m[svc.ID] = value
}
}
job.emptyServices = m

return toDeleteServices
}

func (job *deleteEmptyAutoCreatedServiceJob) deleteEmptyAutoCreatedServices() error {
emptyServices := job.getEmptyAutoCreatedServices()

deleteBatchSize := 100
for i := 0; i < len(emptyServices); i += deleteBatchSize {
j := i + deleteBatchSize
if j > len(emptyServices) {
j = len(emptyServices)
}

ctx, err := buildContext(job.storage)
if err != nil {
log.Errorf("[Maintain][Job][DeleteUnHealthyInstance] build conetxt, err: %v", err)
return err
}
resp := job.namingServer.DeleteServices(ctx, convertDeleteServiceRequest(emptyServices[i:j]))
if api.CalcCode(resp) != 200 {
log.Errorf("[Maintain][Job][DeleteEmptyAutoCreatedService] delete services err, code: %d, info: %s",
resp.Code.GetValue(), resp.Info.GetValue())
}
}

log.Infof("[Maintain][Job][DeleteEmptyAutoCreatedService] delete empty auto-created services count %d",
len(emptyServices))
return nil
}

func convertDeleteServiceRequest(infos []*model.Service) []*apiservice.Service {
var entries = make([]*apiservice.Service, len(infos))
for i, info := range infos {
entries[i] = &apiservice.Service{
Namespace: utils.NewStringValue(info.Namespace),
Name: utils.NewStringValue(info.Name),
}
}
return entries
}
Loading

0 comments on commit 2f2fc69

Please sign in to comment.