diff --git a/bootstrap/config/config.go b/bootstrap/config/config.go index 8a26f92b8..cd44c8ec3 100644 --- a/bootstrap/config/config.go +++ b/bootstrap/config/config.go @@ -30,6 +30,7 @@ import ( "github.com/polarismesh/polaris/cache" "github.com/polarismesh/polaris/common/log" "github.com/polarismesh/polaris/config" + "github.com/polarismesh/polaris/maintain" "github.com/polarismesh/polaris/namespace" "github.com/polarismesh/polaris/plugin" "github.com/polarismesh/polaris/service" @@ -46,6 +47,7 @@ type Config struct { Naming service.Config `yaml:"naming"` Config config.Config `yaml:"config"` HealthChecks healthcheck.Config `yaml:"healthcheck"` + Maintain maintain.Config `yaml:"maintain"` Store store.Config `yaml:"store"` Auth auth.Config `yaml:"auth"` Plugin plugin.Config `yaml:"plugin"` diff --git a/bootstrap/server.go b/bootstrap/server.go index a26cd008c..d317c9f8b 100644 --- a/bootstrap/server.go +++ b/bootstrap/server.go @@ -187,7 +187,7 @@ func StartComponents(ctx context.Context, cfg *boot_config.Config) error { return err } - namingSvr, err := service.GetOriginServer() + namingSvr, err := service.GetServer() if err != nil { return err } @@ -197,7 +197,7 @@ func StartComponents(ctx context.Context, cfg *boot_config.Config) error { } // 初始化运维操作模块 - if err := maintain.Initialize(ctx, namingSvr, healthCheckServer, s); err != nil { + if err := maintain.Initialize(ctx, &cfg.Maintain, namingSvr, healthCheckServer, cacheMgn, s); err != nil { return err } diff --git a/go.mod b/go.mod index f9d770176..0589a7b29 100644 --- a/go.mod +++ b/go.mod @@ -84,6 +84,9 @@ require ( require github.com/polarismesh/specification v1.2.0 -require github.com/DATA-DOG/go-sqlmock v1.5.0 +require ( + github.com/DATA-DOG/go-sqlmock v1.5.0 + github.com/robfig/cron/v3 v3.0.1 +) replace gopkg.in/yaml.v2 => gopkg.in/yaml.v2 v2.2.2 diff --git a/go.sum b/go.sum index e0d9f9250..9e74ff141 100644 --- a/go.sum +++ b/go.sum @@ -346,6 +346,8 @@ github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4O github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= github.com/prometheus/procfs v0.7.3 h1:4jVXhlkAyzOScmCkXBTOLRLTz8EeU+eyjrwB/EPq0VU= github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= +github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= +github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= diff --git a/maintain/config.go b/maintain/config.go new file mode 100644 index 000000000..a4cd867e2 --- /dev/null +++ b/maintain/config.go @@ -0,0 +1,25 @@ +/** + * Tencent is pleased to support the open source community by making Polaris available. + * + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package maintain + +import "github.com/polarismesh/polaris/maintain/job" + +// Config maintain configuration +type Config struct { + Jobs []job.JobConfig `yaml:"jobs"` +} diff --git a/maintain/default.go b/maintain/default.go index b700dc799..3e06c0ec1 100644 --- a/maintain/default.go +++ b/maintain/default.go @@ -22,6 +22,8 @@ import ( "errors" "github.com/polarismesh/polaris/auth" + "github.com/polarismesh/polaris/cache" + "github.com/polarismesh/polaris/maintain/job" "github.com/polarismesh/polaris/service" "github.com/polarismesh/polaris/service/healthcheck" "github.com/polarismesh/polaris/store" @@ -34,14 +36,14 @@ var ( ) // Initialize 初始化 -func Initialize(ctx context.Context, namingService service.DiscoverServer, healthCheckServer *healthcheck.Server, - storage store.Store) error { +func Initialize(ctx context.Context, cfg *Config, namingService service.DiscoverServer, + healthCheckServer *healthcheck.Server, cacheMgn *cache.CacheManager, storage store.Store) error { if finishInit { return nil } - err := initialize(ctx, namingService, healthCheckServer, storage) + err := initialize(ctx, cfg, namingService, healthCheckServer, cacheMgn, storage) if err != nil { return err } @@ -50,8 +52,8 @@ func Initialize(ctx context.Context, namingService service.DiscoverServer, healt return nil } -func initialize(_ context.Context, namingService service.DiscoverServer, healthCheckServer *healthcheck.Server, - storage store.Store) error { +func initialize(_ context.Context, cfg *Config, namingService service.DiscoverServer, + healthCheckServer *healthcheck.Server, cacheMgn *cache.CacheManager, storage store.Store) error { authServer, err := auth.GetAuthServer() if err != nil { @@ -60,8 +62,14 @@ func initialize(_ context.Context, namingService service.DiscoverServer, healthC maintainServer.namingServer = namingService maintainServer.healthCheckServer = healthCheckServer + maintainServer.cacheMgn = cacheMgn maintainServer.storage = storage + maintainJobs := job.NewMaintainJobs(namingService, cacheMgn, storage) + if err := maintainJobs.StartMaintianJobs(cfg.Jobs); err != nil { + return err + } + server = newServerAuthAbility(maintainServer, authServer) return nil } diff --git a/maintain/job/clean_deleted_instance.go b/maintain/job/clean_deleted_instance.go new file mode 100644 index 000000000..e1809c684 --- /dev/null +++ b/maintain/job/clean_deleted_instance.go @@ -0,0 +1,50 @@ +/** + * Tencent is pleased to support the open source community by making Polaris available. + * + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package job + +import ( + "github.com/polarismesh/polaris/store" +) + +type cleanDeletedInstancesJob struct { + storage store.Store +} + +func (job *cleanDeletedInstancesJob) init(raw map[string]interface{}) error { + return nil +} + +func (job *cleanDeletedInstancesJob) execute() { + batchSize := uint32(100) + for { + count, err := job.storage.BatchCleanDeletedInstances(batchSize) + if err != nil { + log.Errorf("[Maintain][Job][CleanDeletedInstances] batch clean deleted instance, err: %v", err) + break + } + + log.Infof("[Maintain][Job][CleanDeletedInstances] clean deleted instance count %d", count) + + if count < batchSize { + break + } + } +} + +func (job *cleanDeletedInstancesJob) clear() { +} diff --git a/maintain/job/config.go b/maintain/job/config.go new file mode 100644 index 000000000..d64785851 --- /dev/null +++ b/maintain/job/config.go @@ -0,0 +1,26 @@ +/** + * Tencent is pleased to support the open source community by making Polaris available. + * + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package job + +// JobcConfig maintain job configuration +type JobConfig struct { + Name string `yaml:"name"` + Enable bool `yaml:"enable"` + CronSpec string `yaml:"cronSpec"` + Option map[string]interface{} `yaml:"option"` +} diff --git a/maintain/job/delete_empty_service.go b/maintain/job/delete_empty_service.go new file mode 100644 index 000000000..d91da1d64 --- /dev/null +++ b/maintain/job/delete_empty_service.go @@ -0,0 +1,159 @@ +/** + * Tencent is pleased to support the open source community by making Polaris available. + * + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package job + +import ( + "time" + + "github.com/mitchellh/mapstructure" + apiservice "github.com/polarismesh/specification/source/go/api/v1/service_manage" + + "github.com/polarismesh/polaris/cache" + api "github.com/polarismesh/polaris/common/api/v1" + "github.com/polarismesh/polaris/common/model" + "github.com/polarismesh/polaris/common/utils" + "github.com/polarismesh/polaris/service" + "github.com/polarismesh/polaris/store" +) + +type DeleteEmptyAutoCreatedServiceJobConfig struct { + ServiceDeleteTimeout time.Duration `mapstructure:"serviceDeleteTimeout"` +} + +type deleteEmptyAutoCreatedServiceJob struct { + cfg *DeleteEmptyAutoCreatedServiceJobConfig + namingServer service.DiscoverServer + cacheMgn *cache.CacheManager + storage store.Store + emptyServices map[string]time.Time +} + +func (job *deleteEmptyAutoCreatedServiceJob) init(raw map[string]interface{}) error { + cfg := &DeleteEmptyAutoCreatedServiceJobConfig{} + decodeConfig := &mapstructure.DecoderConfig{ + DecodeHook: mapstructure.StringToTimeDurationHookFunc(), + Result: cfg, + } + decoder, err := mapstructure.NewDecoder(decodeConfig) + if err != nil { + log.Errorf("[Maintain][Job][DeleteEmptyAutoCreatedService] new config decoder err: %v", err) + return err + } + err = decoder.Decode(raw) + if err != nil { + log.Errorf("[Maintain][Job][DeleteEmptyAutoCreatedService] parse config err: %v", err) + return err + } + job.cfg = cfg + job.emptyServices = map[string]time.Time{} + return nil +} + +func (job *deleteEmptyAutoCreatedServiceJob) execute() { + err := job.deleteEmptyAutoCreatedServices() + if err != nil { + log.Errorf("[Maintain][Job][DeleteEmptyAutoCreatedService] delete empty autocreated services, err: %v", err) + } +} + +func (job *deleteEmptyAutoCreatedServiceJob) clear() { + job.emptyServices = map[string]time.Time{} +} + +func (job *deleteEmptyAutoCreatedServiceJob) getEmptyAutoCreatedServices() []*model.Service { + services := job.getAllEmptyAutoCreatedServices() + return job.filterToDeletedServices(services, time.Now(), job.cfg.ServiceDeleteTimeout) +} + +func (job *deleteEmptyAutoCreatedServiceJob) getAllEmptyAutoCreatedServices() []*model.Service { + var res []*model.Service + _ = job.cacheMgn.Service().IteratorServices(func(key string, svc *model.Service) (bool, error) { + if svc.IsAlias() { + return true, nil + } + v, ok := svc.Meta[service.MetadataInternalAutoCreated] + if !ok || v != "true" { + return true, nil + } + count := job.cacheMgn.Instance().GetInstancesCountByServiceID(svc.ID) + if count.TotalInstanceCount == 0 { + res = append(res, svc) + } + return true, nil + }) + return res +} + +func (job *deleteEmptyAutoCreatedServiceJob) filterToDeletedServices(services []*model.Service, + now time.Time, timeout time.Duration) []*model.Service { + var toDeleteServices []*model.Service + m := map[string]time.Time{} + for _, svc := range services { + value, ok := job.emptyServices[svc.ID] + if !ok { + m[svc.ID] = now + continue + } + if now.After(value.Add(timeout)) { + toDeleteServices = append(toDeleteServices, svc) + } else { + m[svc.ID] = value + } + } + job.emptyServices = m + + return toDeleteServices +} + +func (job *deleteEmptyAutoCreatedServiceJob) deleteEmptyAutoCreatedServices() error { + emptyServices := job.getEmptyAutoCreatedServices() + + deleteBatchSize := 100 + for i := 0; i < len(emptyServices); i += deleteBatchSize { + j := i + deleteBatchSize + if j > len(emptyServices) { + j = len(emptyServices) + } + + ctx, err := buildContext(job.storage) + if err != nil { + log.Errorf("[Maintain][Job][DeleteUnHealthyInstance] build conetxt, err: %v", err) + return err + } + resp := job.namingServer.DeleteServices(ctx, convertDeleteServiceRequest(emptyServices[i:j])) + if api.CalcCode(resp) != 200 { + log.Errorf("[Maintain][Job][DeleteEmptyAutoCreatedService] delete services err, code: %d, info: %s", + resp.Code.GetValue(), resp.Info.GetValue()) + } + } + + log.Infof("[Maintain][Job][DeleteEmptyAutoCreatedService] delete empty auto-created services count %d", + len(emptyServices)) + return nil +} + +func convertDeleteServiceRequest(infos []*model.Service) []*apiservice.Service { + var entries = make([]*apiservice.Service, len(infos)) + for i, info := range infos { + entries[i] = &apiservice.Service{ + Namespace: utils.NewStringValue(info.Namespace), + Name: utils.NewStringValue(info.Name), + } + } + return entries +} diff --git a/maintain/job/delete_empty_service_test.go b/maintain/job/delete_empty_service_test.go new file mode 100644 index 000000000..e115e9b2a --- /dev/null +++ b/maintain/job/delete_empty_service_test.go @@ -0,0 +1,95 @@ +/** + * Tencent is pleased to support the open source community by making Polaris available. + * + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package job + +import ( + "testing" + "time" + + "github.com/polarismesh/polaris/common/model" +) + +func Test_DeleteEmptyAutoCreatedServiceJobConfigInit(t *testing.T) { + expectValue := 1 * time.Minute + raw := map[string]interface{}{ + "serviceDeleteTimeout": "1m", + } + + job := deleteEmptyAutoCreatedServiceJob{} + err := job.init(raw) + if err != nil { + t.Errorf("init deleteEmptyAutoCreatedServiceJob config, err: %v", err) + } + + if job.cfg.ServiceDeleteTimeout != expectValue { + t.Errorf("init deleteEmptyAutoCreatedServiceJob config. expect: %s, actual: %s", + expectValue, job.cfg.ServiceDeleteTimeout) + } +} + +func Test_DeleteEmptyAutoCreatedServiceJobConfigInitErr(t *testing.T) { + raw := map[string]interface{}{ + "serviceDeleteTimeout": "xx", + } + + job := deleteEmptyAutoCreatedServiceJob{} + err := job.init(raw) + if err == nil { + t.Errorf("init deleteEmptyAutoCreatedServiceJob config should err") + } +} + +func Test_FilterToDeletedServices(t *testing.T) { + + job := deleteEmptyAutoCreatedServiceJob{} + t1, _ := time.Parse("2006-01-02 15:04:05", "2023-03-20 12:01:00") + t2, _ := time.Parse("2006-01-02 15:04:05", "2023-03-20 12:02:00") + job.emptyServices = map[string]time.Time{ + "a": t1, + "b": t2, + } + + services := []*model.Service{ + { + ID: "a", + }, + { + ID: "b", + }, + { + ID: "c", + }, + } + + now, _ := time.Parse("2006-01-02 15:04:05", "2023-03-20 12:03:00") + toDeleteServices := job.filterToDeletedServices(services, now, time.Minute) + if len(toDeleteServices) != 1 { + t.Errorf("one service should be deleted") + } + if toDeleteServices[0].ID != "a" { + t.Errorf("to deleted service. expect: %s, actual: %s", "a", toDeleteServices[0].ID) + } + + if len(job.emptyServices) != 2 { + t.Errorf("two service should be candicated, actual: %v", job.emptyServices) + } + svcBTime := job.emptyServices["b"] + if svcBTime != t2 { + t.Errorf("empty service record time. expect: %s, actual: %s", t2, svcBTime) + } +} diff --git a/maintain/job/delete_unhealthy_instance.go b/maintain/job/delete_unhealthy_instance.go new file mode 100644 index 000000000..c93c778a7 --- /dev/null +++ b/maintain/job/delete_unhealthy_instance.go @@ -0,0 +1,103 @@ +/** + * Tencent is pleased to support the open source community by making Polaris available. + * + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package job + +import ( + "time" + + "github.com/mitchellh/mapstructure" + apiservice "github.com/polarismesh/specification/source/go/api/v1/service_manage" + + api "github.com/polarismesh/polaris/common/api/v1" + "github.com/polarismesh/polaris/common/utils" + "github.com/polarismesh/polaris/service" + "github.com/polarismesh/polaris/store" +) + +type DeleteUnHealthyInstanceJobConfig struct { + InstanceDeleteTimeout time.Duration `mapstructure:"instanceDeleteTimeout"` +} + +type deleteUnHealthyInstanceJob struct { + cfg *DeleteUnHealthyInstanceJobConfig + namingServer service.DiscoverServer + storage store.Store +} + +func (job *deleteUnHealthyInstanceJob) init(raw map[string]interface{}) error { + cfg := &DeleteUnHealthyInstanceJobConfig{} + decodeConfig := &mapstructure.DecoderConfig{ + DecodeHook: mapstructure.StringToTimeDurationHookFunc(), + Result: cfg, + } + decoder, err := mapstructure.NewDecoder(decodeConfig) + if err != nil { + log.Errorf("[Maintain][Job][DeleteUnHealthyInstance] new config decoder err: %v", err) + return err + } + err = decoder.Decode(raw) + if err != nil { + log.Errorf("[Maintain][Job][DeleteUnHealthyInstance] parse config err: %v", err) + return err + } + job.cfg = cfg + + return nil +} + +func (job *deleteUnHealthyInstanceJob) execute() { + batchSize := uint32(100) + var count int = 0 + for { + instanceIds, err := job.storage.GetUnHealthyInstances(job.cfg.InstanceDeleteTimeout, batchSize) + if err != nil { + log.Errorf("[Maintain][Job][DeleteUnHealthyInstance] get unhealthy instances, err: %v", err) + break + } + if len(instanceIds) == 0 { + break + } + + var req []*apiservice.Instance + for _, id := range instanceIds { + req = append(req, &apiservice.Instance{Id: utils.NewStringValue(id)}) + } + + ctx, err := buildContext(job.storage) + if err != nil { + log.Errorf("[Maintain][Job][DeleteUnHealthyInstance] build conetxt, err: %v", err) + return + } + resp := job.namingServer.DeleteInstances(ctx, req) + if api.CalcCode(resp) == 200 { + log.Infof("[Maintain][Job][DeleteUnHealthyInstance] delete instance count %d, list: %v", + len(instanceIds), instanceIds) + } else { + log.Errorf("[Maintain][Job][DeleteUnHealthyInstance] delete instance list: %v, err: %d %s", + instanceIds, resp.Code.GetValue(), resp.Info.GetValue()) + break + } + count += len(instanceIds) + } + + log.Infof("[Maintain][Job][DeleteUnHealthyInstance] delete unhealthy instance count %d", count) + +} + +func (job *deleteUnHealthyInstanceJob) clear() { +} diff --git a/maintain/job/delete_unhealthy_instance_test.go b/maintain/job/delete_unhealthy_instance_test.go new file mode 100644 index 000000000..dcb6d5748 --- /dev/null +++ b/maintain/job/delete_unhealthy_instance_test.go @@ -0,0 +1,53 @@ +/** + * Tencent is pleased to support the open source community by making Polaris available. + * + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package job + +import ( + "testing" + "time" +) + +func Test_DeleteUnHealthyInstanceJobConfigInit(t *testing.T) { + expectValue := 10 * time.Minute + raw := map[string]interface{}{ + "instanceDeleteTimeout": "10m", + } + + job := deleteUnHealthyInstanceJob{} + err := job.init(raw) + if err != nil { + t.Errorf("init deleteUnHealthyInstanceJob config, err: %v", err) + } + + if job.cfg.InstanceDeleteTimeout != expectValue { + t.Errorf("init deleteUnHealthyInstanceJob config. expect: %s, actual: %s", + expectValue, job.cfg.InstanceDeleteTimeout) + } +} + +func Test_DeleteUnHealthyInstanceJobConfigInitErr(t *testing.T) { + raw := map[string]interface{}{ + "instanceDeleteTimeout": "xx", + } + + job := deleteUnHealthyInstanceJob{} + err := job.init(raw) + if err == nil { + t.Errorf("init deleteUnHealthyInstanceJob config should err") + } +} diff --git a/maintain/job/job.go b/maintain/job/job.go new file mode 100644 index 000000000..cd73b67db --- /dev/null +++ b/maintain/job/job.go @@ -0,0 +1,148 @@ +/** + * Tencent is pleased to support the open source community by making Polaris available. + * + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package job + +import ( + "context" + "fmt" + + "github.com/robfig/cron/v3" + + "github.com/polarismesh/polaris/cache" + commonlog "github.com/polarismesh/polaris/common/log" + "github.com/polarismesh/polaris/common/utils" + "github.com/polarismesh/polaris/service" + "github.com/polarismesh/polaris/store" +) + +var log = commonlog.GetScopeOrDefaultByName(commonlog.DefaultLoggerName) + +// MaintainJobs +type MaintainJobs struct { + jobs map[string]maintainJob + startedJobs map[string]maintainJob + scheduler *cron.Cron + storage store.Store +} + +// NewMaintainJobs +func NewMaintainJobs(namingServer service.DiscoverServer, cacheMgn *cache.CacheManager, + storage store.Store) *MaintainJobs { + return &MaintainJobs{ + jobs: map[string]maintainJob{ + "DeleteUnHealthyInstance": &deleteUnHealthyInstanceJob{ + namingServer: namingServer, storage: storage}, + "DeleteEmptyAutoCreatedService": &deleteEmptyAutoCreatedServiceJob{ + namingServer: namingServer, cacheMgn: cacheMgn, storage: storage}, + "CleanDeletedInstances": &cleanDeletedInstancesJob{ + storage: storage}, + }, + startedJobs: map[string]maintainJob{}, + scheduler: newCron(), + storage: storage, + } +} + +// StartMaintainJobs +func (mj *MaintainJobs) StartMaintianJobs(configs []JobConfig) error { + for _, cfg := range configs { + if !cfg.Enable { + log.Infof("[Maintain][Job] job (%s) not enable", cfg.Name) + continue + } + job, ok := mj.jobs[cfg.Name] + if !ok { + return fmt.Errorf("[Maintain][Job] job (%s) not exist", cfg.Name) + } + _, ok = mj.startedJobs[cfg.Name] + if ok { + return fmt.Errorf("[Maintain][Job] job (%s) duplicated", cfg.Name) + } + err := job.init(cfg.Option) + if err != nil { + log.Errorf("[Maintain][Job] job (%s) fail to init, err: %v", cfg.Name, err) + return fmt.Errorf("[Maintain][Job] job (%s) fail to init", cfg.Name) + } + err = mj.storage.StartLeaderElection(store.ElectionKeyMaintainJobPrefix + cfg.Name) + if err != nil { + log.Errorf("[Maintain][Job][%s] start leader election err: %v", cfg.Name, err) + return err + } + _, err = mj.scheduler.AddFunc(cfg.CronSpec, newCronCmd(cfg.Name, job, mj.storage)) + if err != nil { + log.Errorf("[Maintain][Job] job (%s) fail to start, err: %v", cfg.Name, err) + return fmt.Errorf("[Maintain][Job] job (%s) fail to start", cfg.Name) + } + mj.startedJobs[cfg.Name] = job + } + mj.scheduler.Start() + return nil +} + +// StopMaintainJobs +func (mj *MaintainJobs) StopMaintainJobs() { + ctx := mj.scheduler.Stop() + <-ctx.Done() + mj.startedJobs = map[string]maintainJob{} +} + +func newCron() *cron.Cron { + return cron.New(cron.WithChain( + cron.Recover(cron.DefaultLogger)), + cron.WithParser(cron.NewParser( + cron.Minute|cron.Hour|cron.Dom|cron.Month|cron.Dow|cron.Descriptor))) +} + +func newCronCmd(name string, job maintainJob, storage store.Store) func() { + return func() { + if !storage.IsLeader(store.ElectionKeyMaintainJobPrefix + name) { + log.Infof("[Maintain][Job][%s] I am follower", name) + job.clear() + return + } + log.Infof("[Maintain][Job][%s] I am leader, job start", name) + job.execute() + log.Infof("[Maintain][Job][%s] I am leader, job end", name) + + } +} + +type maintainJob interface { + init(cfg map[string]interface{}) error + execute() + clear() +} + +func getMasterAccountToken(storage store.Store) (string, error) { + user, err := storage.GetUserByName("polaris", "") + if err != nil { + return "", err + } + return user.Token, nil +} + +func buildContext(storage store.Store) (context.Context, error) { + token, err := getMasterAccountToken(storage) + if err != nil { + return nil, err + } + ctx := context.Background() + ctx = context.WithValue(ctx, utils.ContextAuthTokenKey, token) + ctx = context.WithValue(ctx, utils.ContextOperator, "maintain-job") + return ctx, nil +} diff --git a/maintain/maintain.go b/maintain/maintain.go index 4bec59348..e6742d21f 100644 --- a/maintain/maintain.go +++ b/maintain/maintain.go @@ -23,11 +23,15 @@ import ( "runtime/debug" "time" + apimodel "github.com/polarismesh/specification/source/go/api/v1/model" apiservice "github.com/polarismesh/specification/source/go/api/v1/service_manage" + "go.uber.org/zap" + api "github.com/polarismesh/polaris/common/api/v1" connlimit "github.com/polarismesh/polaris/common/conn/limit" commonlog "github.com/polarismesh/polaris/common/log" "github.com/polarismesh/polaris/common/model" + "github.com/polarismesh/polaris/common/utils" "github.com/polarismesh/polaris/plugin" ) @@ -135,7 +139,28 @@ func (s *Server) FreeOSMemory(_ context.Context) error { } func (s *Server) CleanInstance(ctx context.Context, req *apiservice.Instance) *apiservice.Response { - return s.namingServer.CleanInstance(ctx, req) + getInstanceID := func() (string, *apiservice.Response) { + if req.GetId() != nil { + if req.GetId().GetValue() == "" { + return "", api.NewInstanceResponse(apimodel.Code_InvalidInstanceID, req) + } + return req.GetId().GetValue(), nil + } + return utils.CheckInstanceTetrad(req) + } + + instanceID, resp := getInstanceID() + if resp != nil { + return resp + } + if err := s.storage.CleanInstance(instanceID); err != nil { + log.Error("Clean instance", + zap.String("err", err.Error()), utils.ZapRequestID(utils.ParseRequestID(ctx))) + return api.NewInstanceResponse(apimodel.Code_StoreLayerException, req) + } + + log.Info("Clean instance", utils.ZapRequestID(utils.ParseRequestID(ctx)), utils.ZapInstanceID(instanceID)) + return api.NewInstanceResponse(apimodel.Code_ExecuteSuccess, req) } func (s *Server) BatchCleanInstances(ctx context.Context, batchSize uint32) (uint32, error) { diff --git a/maintain/server.go b/maintain/server.go index 47a86e6b3..2aae0f56b 100644 --- a/maintain/server.go +++ b/maintain/server.go @@ -20,6 +20,7 @@ package maintain import ( "sync" + "github.com/polarismesh/polaris/cache" "github.com/polarismesh/polaris/service" "github.com/polarismesh/polaris/service/healthcheck" "github.com/polarismesh/polaris/store" @@ -31,5 +32,6 @@ type Server struct { mu sync.Mutex namingServer service.DiscoverServer healthCheckServer *healthcheck.Server + cacheMgn *cache.CacheManager storage store.Store } diff --git a/release/conf/polaris-server.yaml b/release/conf/polaris-server.yaml index ac528625d..c9b771b19 100644 --- a/release/conf/polaris-server.yaml +++ b/release/conf/polaris-server.yaml @@ -388,6 +388,25 @@ cache: expireTimeAfterWrite: 3600 - name: faultDetectRule # - name: l5 # Load L5 data +# Maintain configuration +maintain: + jobs: + # Clean up long term unhealthy instance + - name: DeleteUnHealthyInstance + enable: false + cronSpec: "0 0 * * ?" + option: + instanceDeleteTimeout: 60m + # Delete auto-created service without an instance + - name: DeleteEmptyAutoCreatedService + enable: false + cronSpec: "*/10 * * * ?" + option: + serviceDeleteTimeout: 30m + # Clean soft deleted instances + - name: CleanDeletedInstances + enable: true + cronSpec: "0 0 * * 1" # Storage configuration store: # Standalone file storage plugin diff --git a/service/api_v1.go b/service/api_v1.go index 0c8e7a9e2..7d69875eb 100644 --- a/service/api_v1.go +++ b/service/api_v1.go @@ -187,9 +187,6 @@ type InstanceOperateServer interface { // GetInstanceLabels Get an instance tag under a service GetInstanceLabels(ctx context.Context, query map[string]string) *apiservice.Response - - // CleanInstance Clean up instance - CleanInstance(ctx context.Context, req *apiservice.Instance) *apiservice.Response } // ClientServer Client related operation Client operation interface definition diff --git a/service/instance.go b/service/instance.go index c3d568dd1..cb7171fb8 100644 --- a/service/instance.go +++ b/service/instance.go @@ -833,33 +833,6 @@ func (s *Server) GetInstancesCount(ctx context.Context) *apiservice.BatchQueryRe return out } -// CleanInstance 清理无效的实例(flag == 1) -func (s *Server) CleanInstance(ctx context.Context, req *apiservice.Instance) *apiservice.Response { - // 无效数据,不需要鉴权,直接删除 - getInstanceID := func() (string, *apiservice.Response) { - if req.GetId() != nil { - if req.GetId().GetValue() == "" { - return "", api.NewInstanceResponse(apimodel.Code_InvalidInstanceID, req) - } - return req.GetId().GetValue(), nil - } - return utils.CheckInstanceTetrad(req) - } - - instanceID, resp := getInstanceID() - if resp != nil { - return resp - } - if err := s.storage.CleanInstance(instanceID); err != nil { - log.Error("Clean instance", - zap.String("err", err.Error()), utils.ZapRequestID(utils.ParseRequestID(ctx))) - return api.NewInstanceResponse(apimodel.Code_StoreLayerException, req) - } - - log.Info("Clean instance", utils.ZapRequestID(utils.ParseRequestID(ctx)), utils.ZapInstanceID(instanceID)) - return api.NewInstanceResponse(apimodel.Code_ExecuteSuccess, req) -} - // update/delete instance前置条件 func (s *Server) execInstancePreStep(ctx context.Context, req *apiservice.Instance) ( *model.Service, *model.Instance, *apiservice.Response) { diff --git a/service/instance_authability.go b/service/instance_authability.go index 9d9cf1b27..f18ee1cb3 100644 --- a/service/instance_authability.go +++ b/service/instance_authability.go @@ -146,21 +146,6 @@ func (svr *serverAuthAbility) GetInstancesCount(ctx context.Context) *apiservice return svr.targetServer.GetInstancesCount(ctx) } -// CleanInstance clean instance -func (svr *serverAuthAbility) CleanInstance(ctx context.Context, req *apiservice.Instance) *apiservice.Response { - authCtx := svr.collectInstanceAuthContext( - ctx, []*apiservice.Instance{req}, model.Delete, "CleanInstance") - - _, err := svr.authMgn.CheckConsolePermission(authCtx) - if err != nil { - return api.NewResponseWithMsg(convertToErrCode(err), err.Error()) - } - ctx = authCtx.GetRequestContext() - ctx = context.WithValue(ctx, utils.ContextAuthContextKey, authCtx) - - return svr.targetServer.CleanInstance(ctx, req) -} - func (svr *serverAuthAbility) GetInstanceLabels(ctx context.Context, query map[string]string) *apiservice.Response { authCtx := svr.collectInstanceAuthContext(ctx, nil, model.Read, "GetInstanceLabels") _, err := svr.authMgn.CheckConsolePermission(authCtx) diff --git a/store/boltdb/maintain.go b/store/boltdb/maintain.go index e8d4945aa..9596b8c93 100644 --- a/store/boltdb/maintain.go +++ b/store/boltdb/maintain.go @@ -22,6 +22,8 @@ import ( "sync" "time" + apiservice "github.com/polarismesh/specification/source/go/api/v1/service_manage" + "github.com/polarismesh/polaris/common/eventhub" "github.com/polarismesh/polaris/common/model" "github.com/polarismesh/polaris/common/utils" @@ -132,3 +134,63 @@ func (m *maintainStore) BatchCleanDeletedInstances(batchSize uint32) (uint32, er } return count, nil } + +func (m *maintainStore) GetUnHealthyInstances(timeout time.Duration, limit uint32) ([]string, error) { + return m.getUnHealthyInstancesBefore(time.Now().Add(-timeout), limit) +} + +func (m *maintainStore) getUnHealthyInstancesBefore(mtime time.Time, limit uint32) ([]string, error) { + fields := []string{insFieldProto, insFieldValid} + instances, err := m.handler.LoadValuesByFilter(tblNameInstance, fields, &model.Instance{}, + func(m map[string]interface{}) bool { + + valid, ok := m[insFieldValid] + if ok && !valid.(bool) { + return false + } + + insProto, ok := m[insFieldProto] + if !ok { + return false + } + + ins := insProto.(*apiservice.Instance) + + insMtime, err := time.Parse("2006-01-02 15:04:05", ins.GetMtime().GetValue()) + if err != nil { + log.Errorf("[Store][boltdb] parse instance mtime error, %v", err) + return false + } + + if insMtime.Before(mtime) { + return false + } + + if !ins.GetEnableHealthCheck().GetValue() { + return false + } + + if ins.GetHealthy().GetValue() { + return false + } + + return true + }) + + if err != nil { + log.Errorf("[Store][boltdb] load instance from kv error, %v", err) + return nil, err + } + + var instanceIds []string + var count uint32 = 0 + for _, v := range instances { + instanceIds = append(instanceIds, v.(*model.Instance).ID()) + count += 1 + if count >= limit { + break + } + } + + return instanceIds, nil +} diff --git a/store/boltdb/maintain_test.go b/store/boltdb/maintain_test.go index 298b55f9f..88916deb8 100644 --- a/store/boltdb/maintain_test.go +++ b/store/boltdb/maintain_test.go @@ -162,6 +162,77 @@ func TestMaintainStore_ListLeaderElections(t *testing.T) { } +func TestMaintainStore_getUnHealthyInstancesBefore(t *testing.T) { + handler, err := NewBoltHandler(&BoltConfig{FileName: "./table.bolt"}) + if err != nil { + t.Fatal(err) + } + defer func() { + handler.Close() + _ = os.RemoveAll("./table.bolt") + }() + + store := &maintainStore{handler: handler} + sStore := &serviceStore{handler: handler} + insStore := &instanceStore{handler: handler} + + svcId := "svcid1" + sStore.AddService(&model.Service{ + ID: svcId, + Name: svcId, + Namespace: svcId, + Token: svcId, + Owner: svcId, + Valid: true, + }) + + mtime := time.Date(2023, 3, 4, 11, 0, 0, 0, time.Local) + for i := 0; i < insCount; i++ { + nowt := commontime.Time2String(mtime) + err := insStore.AddInstance(&model.Instance{ + Proto: &apiservice.Instance{ + Id: &wrappers.StringValue{Value: "insid" + strconv.Itoa(i)}, + Host: &wrappers.StringValue{Value: "1.1.1." + strconv.Itoa(i)}, + Port: &wrappers.UInt32Value{Value: uint32(i + 1)}, + Protocol: &wrappers.StringValue{Value: "grpc"}, + Weight: &wrappers.UInt32Value{Value: uint32(i + 1)}, + EnableHealthCheck: &wrappers.BoolValue{Value: true}, + Healthy: &wrappers.BoolValue{Value: true}, + Isolate: &wrappers.BoolValue{Value: true}, + Metadata: map[string]string{ + "insk1": "insv1", + "insk2": "insv2", + }, + Ctime: &wrappers.StringValue{Value: nowt}, + Mtime: &wrappers.StringValue{Value: nowt}, + Revision: &wrappers.StringValue{Value: "revision" + strconv.Itoa(i)}, + }, + ServiceID: svcId, + ServicePlatformID: "svcPlatId1", + Valid: true, + ModifyTime: time.Now(), + }) + if err != nil { + t.Fatal(err) + } + } + + toUnHealthyInstances := []interface{}{"insid1", "insid2", "insid3"} + err = insStore.BatchSetInstanceHealthStatus(toUnHealthyInstances, 0, "revision-11") + if err != nil { + t.Fatal(err) + } + + beforeTime := time.Date(2023, 3, 4, 11, 1, 0, 0, time.Local) + ids, err := store.getUnHealthyInstancesBefore(beforeTime, 2) + if err != nil { + t.Fatal(err) + } + if len(ids) != 2 { + t.Fatalf("count not match, expect cnt=%d, actual cnt=%d", 2, len(ids)) + } +} + func TestMain(m *testing.M) { setup() code := m.Run() diff --git a/store/maintain_api.go b/store/maintain_api.go index 0e77b9fb2..a86afa9a1 100644 --- a/store/maintain_api.go +++ b/store/maintain_api.go @@ -17,10 +17,15 @@ package store -import "github.com/polarismesh/polaris/common/model" +import ( + "time" + + "github.com/polarismesh/polaris/common/model" +) const ( ElectionKeySelfServiceChecker = "polaris.checker" + ElectionKeyMaintainJobPrefix = "MaintainJob." ) type MaintainStore interface { @@ -38,6 +43,9 @@ type MaintainStore interface { // BatchCleanDeletedInstances batch clean soft deleted instances BatchCleanDeletedInstances(batchSize uint32) (uint32, error) + + // GetUnHealthyInstances get unhealthy instances which mtime time out + GetUnHealthyInstances(timeout time.Duration, limit uint32) ([]string, error) } // LeaderChangeEvent diff --git a/store/mock/api_mock.go b/store/mock/api_mock.go index c39e32649..2dd023511 100644 --- a/store/mock/api_mock.go +++ b/store/mock/api_mock.go @@ -9,7 +9,6 @@ import ( time "time" gomock "github.com/golang/mock/gomock" - model "github.com/polarismesh/polaris/common/model" store "github.com/polarismesh/polaris/store" ) @@ -1818,6 +1817,21 @@ func (mr *MockStoreMockRecorder) GetSystemServices() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetSystemServices", reflect.TypeOf((*MockStore)(nil).GetSystemServices)) } +// GetUnHealthyInstances mocks base method. +func (m *MockStore) GetUnHealthyInstances(timeout time.Duration, limit uint32) ([]string, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetUnHealthyInstances", timeout, limit) + ret0, _ := ret[0].([]string) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetUnHealthyInstances indicates an expected call of GetUnHealthyInstances. +func (mr *MockStoreMockRecorder) GetUnHealthyInstances(timeout, limit interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetUnHealthyInstances", reflect.TypeOf((*MockStore)(nil).GetUnHealthyInstances), timeout, limit) +} + // GetUnixSecond mocks base method. func (m *MockStore) GetUnixSecond(maxWait time.Duration) (int64, error) { m.ctrl.T.Helper() diff --git a/store/mysql/maintain.go b/store/mysql/maintain.go index 0e2f3fc39..613107476 100644 --- a/store/mysql/maintain.go +++ b/store/mysql/maintain.go @@ -448,3 +448,32 @@ func (m *maintainStore) BatchCleanDeletedInstances(batchSize uint32) (uint32, er }) return uint32(rows), err } + +func (m *maintainStore) GetUnHealthyInstances(timeout time.Duration, limit uint32) ([]string, error) { + log.Infof("[Store][database] get unhealthy instances which mtime timeout %s (%d)", timeout, limit) + queryStr := "select id from instance where flag=0 and enable_health_check=1 and health_status=0 " + + "and mtime < FROM_UNIXTIME(UNIX_TIMESTAMP(SYSDATE()) - ?) limit ?" + rows, err := m.master.Query(queryStr, int32(timeout.Seconds()), limit) + if err != nil { + log.Errorf("[Store][database] get unhealthy instances, err: %s", err.Error()) + return nil, store.Error(err) + } + + var instanceIds []string + defer rows.Close() + for rows.Next() { + var id string + err := rows.Scan(&id) + if err != nil { + log.Errorf("[Store][database] fetch unhealthy instance rows, err: %s", err.Error()) + return nil, store.Error(err) + } + instanceIds = append(instanceIds, id) + } + if err := rows.Err(); err != nil { + log.Errorf("[Store][database] fetch unhealthy instance rows next, err: %s", err.Error()) + return nil, store.Error(err) + } + + return instanceIds, nil +}