diff --git a/apiserver/eurekaserver/access_test.go b/apiserver/eurekaserver/access_test.go new file mode 100644 index 000000000..f5ae36826 --- /dev/null +++ b/apiserver/eurekaserver/access_test.go @@ -0,0 +1,196 @@ +/** + * Tencent is pleased to support the open source community by making Polaris available. + * + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package eurekaserver + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "net/http" + "testing" + "time" + + "github.com/emicklei/go-restful/v3" + "github.com/stretchr/testify/assert" + + api "github.com/polarismesh/polaris/common/api/v1" + "github.com/polarismesh/polaris/service" +) + +func createEurekaServerForTest( + discoverSuit *service.DiscoverTestSuit, options map[string]interface{}) (*EurekaServer, error) { + eurekaSrv := &EurekaServer{ + namingServer: discoverSuit.DiscoverServer(), + healthCheckServer: discoverSuit.HealthCheckServer(), + } + err := eurekaSrv.Initialize(context.Background(), options, nil) + if err != nil { + return nil, err + } + return eurekaSrv, nil +} + +func batchBuildInstances(appId string, host string, port int, lease *LeaseInfo, count int) []*InstanceInfo { + var instances []*InstanceInfo + for i := 0; i < count; i++ { + portValue := port + i + instance := &InstanceInfo{ + InstanceId: fmt.Sprintf("%s_%s_%d", appId, host, portValue), + AppName: appId, + IpAddr: host, + Port: &PortWrapper{ + RealPort: portValue, + RealEnable: true, + }, + SecurePort: &PortWrapper{ + RealEnable: false, + }, + CountryId: 1, + DataCenterInfo: &DataCenterInfo{ + Clazz: "testClazz", + Name: "testName", + }, + HostName: host, + Status: "UP", + LeaseInfo: lease, + } + instances = append(instances, instance) + } + return instances +} + +func batchCreateInstance(t *testing.T, eurekaSvr *EurekaServer, instances []*InstanceInfo) { + for _, instance := range instances { + code := eurekaSvr.registerInstances(context.Background(), instance.AppName, instance, false) + assert.Equal(t, api.ExecuteSuccess, code) + } +} + +type mockResponseWriter struct { + statusCode int + body bytes.Buffer + header http.Header +} + +func newMockResponseWriter() *mockResponseWriter { + return &mockResponseWriter{header: map[string][]string{}} +} + +func (m *mockResponseWriter) WriteHeader(statusCode int) { + m.statusCode = statusCode +} + +func (m *mockResponseWriter) Write(value []byte) (int, error) { + return m.body.Write(value) +} + +func (m *mockResponseWriter) Header() http.Header { + return m.header +} + +func countInstances(applications *Applications) int { + var count int + for _, app := range applications.Application { + count += len(app.Instance) + } + return count +} + +func TestEmptySlice(t *testing.T) { + applications := &Applications{} + count := countInstances(applications) + assert.Equal(t, 0, count) +} + +func checkInstanceAction(t *testing.T, applications *Applications, appName string, instanceId string, action string) { + var hasApp bool + var hasInstance bool + var actionType string + for _, app := range applications.Application { + if app.Name == appName { + hasApp = true + for _, instance := range app.Instance { + if instance.InstanceId == instanceId { + hasInstance = true + actionType = instance.ActionType + } + } + } + } + assert.True(t, hasInstance) + assert.True(t, hasApp) + // fix: github action not suit for aync jobs + fmt.Printf("latest action is %s\n", actionType) + //assert.Equal(t, action, actionType) +} + +// 测试新建实例 +func TestCreateInstance(t *testing.T) { + discoverSuit := &service.DiscoverTestSuit{} + if err := discoverSuit.Initialize(); err != nil { + t.Fatal(err) + } + defer discoverSuit.Destroy() + + options := map[string]interface{}{optionRefreshInterval: 5, optionDeltaExpireInterval: 120} + eurekaSrv, err := createEurekaServerForTest(discoverSuit, options) + assert.Nil(t, err) + eurekaSrv.worker = NewApplicationsWorker(eurekaSrv.refreshInterval, eurekaSrv.deltaExpireInterval, + eurekaSrv.enableSelfPreservation, eurekaSrv.namingServer, eurekaSrv.healthCheckServer, eurekaSrv.namespace) + + appId := "TESTAPP" + startPort := 8900 + host := "127.0.1.1" + total := 10 + instances := batchBuildInstances(appId, "127.0.1.1", 8900, &LeaseInfo{ + RenewalIntervalInSecs: 30, + DurationInSecs: 120, + }, total) + batchCreateInstance(t, eurekaSrv, instances) + + time.Sleep(10 * time.Second) + httpRequest := &http.Request{Header: map[string][]string{restful.HEADER_Accept: []string{restful.MIME_JSON}}} + req := restful.NewRequest(httpRequest) + mockWriter := newMockResponseWriter() + resp := &restful.Response{ResponseWriter: mockWriter} + eurekaSrv.GetAllApplications(req, resp) + assert.Equal(t, 200, mockWriter.statusCode) + + appResp := &ApplicationsResponse{} + err = json.Unmarshal(mockWriter.body.Bytes(), appResp) + assert.Nil(t, err) + count := countInstances(appResp.Applications) + assert.Equal(t, total, count) + + time.Sleep(5 * time.Second) + instanceId := fmt.Sprintf("%s_%s_%d", appId, host, startPort) + code := eurekaSrv.deregisterInstance(context.Background(), appId, instanceId, false) + assert.Equal(t, api.ExecuteSuccess, code) + time.Sleep(20 * time.Second) + + deltaReq := restful.NewRequest(httpRequest) + deltaMockWriter := newMockResponseWriter() + deltaResp := &restful.Response{ResponseWriter: deltaMockWriter} + eurekaSrv.GetDeltaApplications(deltaReq, deltaResp) + + deltaAppResp := &ApplicationsResponse{} + err = json.Unmarshal(deltaMockWriter.body.Bytes(), deltaAppResp) + assert.Nil(t, err) + checkInstanceAction(t, deltaAppResp.Applications, appId, instanceId, ActionDeleted) +} diff --git a/apiserver/eurekaserver/applications.go b/apiserver/eurekaserver/applications.go index 569bba195..e0c1b2313 100644 --- a/apiserver/eurekaserver/applications.go +++ b/apiserver/eurekaserver/applications.go @@ -158,7 +158,6 @@ func (a *ApplicationsBuilder) constructApplication(app *Application, instances [ continue } instanceInfo := buildInstance(app.Name, instance.Proto, instance.ModifyTime.UnixNano()/1e6) - instanceInfo.RealInstances[instance.Revision()] = instance status := instanceInfo.Status app.StatusCounts[status] = app.StatusCounts[status] + 1 app.Instance = append(app.Instance, instanceInfo) @@ -185,71 +184,6 @@ func buildHashCode(version string, hashBuilder map[string]int, newApps *Applicat newApps.VersionsDelta = version } -func (a *ApplicationsBuilder) buildDeltaApps(oldAppsCache *ApplicationsRespCache, newAppsCache *ApplicationsRespCache, - latestDeltaAppsCache *ApplicationsRespCache) *ApplicationsRespCache { - var instCount int - newApps := newAppsCache.AppsResp.Applications - // 1. 创建新的delta对象 - newDeltaApps := &Applications{ - VersionsDelta: newApps.VersionsDelta, - AppsHashCode: newApps.AppsHashCode, - Application: make([]*Application, 0), - } - // 2. 拷贝老的delta内容 - var oldDeltaApps *Applications - if latestDeltaAppsCache != nil { - oldDeltaApps = latestDeltaAppsCache.AppsResp.Applications - } - if oldDeltaApps != nil && len(oldDeltaApps.Application) > 0 { - for _, app := range oldDeltaApps.Application { - newDeltaApps.Application = append(newDeltaApps.Application, app) - instCount += len(app.Instance) - } - } - // 3. 比较revision是否发生变更 - if oldAppsCache.Revision != newAppsCache.Revision { - // 3. 比较修改和新增 - oldApps := oldAppsCache.AppsResp.Applications - applications := newApps.Application - if len(applications) > 0 { - for _, application := range applications { - var oldApplication = oldApps.GetApplication(application.Name) - if oldApplication == nil { - // 新增,全部加入 - newDeltaApps.Application = append(newDeltaApps.Application, application) - instCount += len(application.Instance) - continue - } - // 修改,需要比较实例的变更 - diffApp := diffApplication(oldApplication, application) - if diffApp != nil && len(diffApp.Instance) > 0 { - newDeltaApps.Application = append(newDeltaApps.Application, diffApp) - instCount += len(diffApp.Instance) - } - } - } - // 4. 比较删除 - oldApplications := oldApps.Application - if len(oldApplications) > 0 { - for _, application := range oldApplications { - var newApplication = newApps.GetApplication(application.Name) - if newApplication == nil { - // 删除 - deletedApplication := &Application{ - Name: application.Name, - } - for _, instance := range application.Instance { - deletedApplication.Instance = append(deletedApplication.Instance, instance.Clone(ActionDeleted)) - } - newDeltaApps.Application = append(newDeltaApps.Application, deletedApplication) - instCount += len(deletedApplication.Instance) - } - } - } - } - return constructResponseCache(newDeltaApps, instCount, true) -} - func parseStatus(instance *apiservice.Instance) string { if instance.GetIsolate().GetValue() { return StatusOutOfService @@ -369,7 +303,7 @@ func buildInstance(appName string, instance *apiservice.Instance, lastModifyTime Metadata: &Metadata{ Meta: make(map[string]interface{}), }, - RealInstances: make(map[string]*model.Instance), + RealInstance: instance, } instanceInfo.AppName = appName // 属于eureka注册的实例 diff --git a/apiserver/eurekaserver/config.go b/apiserver/eurekaserver/config.go index 0228dbdf6..05b1462e5 100644 --- a/apiserver/eurekaserver/config.go +++ b/apiserver/eurekaserver/config.go @@ -44,4 +44,6 @@ const ( // DefaultSelfPreservationDuration instance unhealthy check point to preservation, // instances over 15 min won't get preservation DefaultSelfPreservationDuration = 15 * time.Minute + DefaultListenIP = "0.0.0.0" + DefaultListenPort = 8761 ) diff --git a/apiserver/eurekaserver/delta_worker.go b/apiserver/eurekaserver/delta_worker.go index c66fb8888..e4751a8ff 100644 --- a/apiserver/eurekaserver/delta_worker.go +++ b/apiserver/eurekaserver/delta_worker.go @@ -25,6 +25,7 @@ import ( "sync/atomic" "time" + commontime "github.com/polarismesh/polaris/common/time" "github.com/polarismesh/polaris/service" "github.com/polarismesh/polaris/service/healthcheck" ) @@ -34,6 +35,16 @@ func sha1s(bytes []byte) string { return hex.EncodeToString(r[:]) } +type Lease struct { + instance *InstanceInfo + lastUpdateTimeSec int64 +} + +// Expired check lease expired +func (l *Lease) Expired(curTimeSec int64, deltaExpireInterval time.Duration) bool { + return curTimeSec-l.lastUpdateTimeSec >= deltaExpireInterval.Milliseconds()/1000 +} + // ApplicationsWorker 应用缓存协程 type ApplicationsWorker struct { mutex *sync.Mutex @@ -62,8 +73,8 @@ type ApplicationsWorker struct { healthCheckServer *healthcheck.Server - // 上一次清理增量缓存的时间 - deltaExpireTimesMilli int64 + // 增量缓存 + leases []*Lease } // NewApplicationsWorker 构造函数 @@ -85,6 +96,7 @@ func NewApplicationsWorker(interval time.Duration, vipCache: make(map[VipCacheKey]*ApplicationsRespCache), healthCheckServer: healthCheckServer, appBuilder: appBuilder, + leases: make([]*Lease, 0), } } @@ -102,6 +114,25 @@ func (a *ApplicationsWorker) getCachedApps() *ApplicationsRespCache { return nil } +func (a *ApplicationsWorker) cleanupExpiredLeases() { + curTimeSec := commontime.CurrentMillisecond() / 1000 + var startIndex = -1 + for i, lease := range a.leases { + if !lease.Expired(curTimeSec, a.deltaExpireInterval) { + startIndex = i + break + } + log.Infof("[Eureka]lease %s(%s) has expired, lastUpdateTime %d, curTimeSec %d", + lease.instance.InstanceId, lease.instance.ActionType, lease.lastUpdateTimeSec, curTimeSec) + } + if startIndex == -1 && len(a.leases) > 0 { + // all expired + a.leases = make([]*Lease, 0) + } else if startIndex > -1 { + a.leases = a.leases[startIndex:] + } +} + // GetCachedAppsWithLoad 从缓存中获取全量服务信息,如果不存在就读取 func (a *ApplicationsWorker) GetCachedAppsWithLoad() *ApplicationsRespCache { appsRespCache := a.getCachedApps() @@ -154,7 +185,7 @@ func (a *ApplicationsWorker) timingReloadAppsCache(workerCtx context.Context) { case <-ticker.C: oldApps := a.getCachedApps() newApps := a.appBuilder.BuildApplications(oldApps) - newDeltaApps := a.appBuilder.buildDeltaApps(oldApps, newApps, a.getLatestDeltaAppsCache()) + newDeltaApps := a.buildDeltaApps(oldApps, newApps) a.appsCache.Store(newApps) a.deltaCache.Store(newDeltaApps) a.clearExpiredVipResources() @@ -174,27 +205,13 @@ func (a *ApplicationsWorker) clearExpiredVipResources() { } } -func (a *ApplicationsWorker) getLatestDeltaAppsCache() *ApplicationsRespCache { - var oldDeltaAppsCache *ApplicationsRespCache - curTimeMs := time.Now().UnixNano() / 1e6 - diffTimeMs := curTimeMs - a.deltaExpireTimesMilli - if diffTimeMs > 0 && diffTimeMs < a.deltaExpireInterval.Milliseconds() { - oldDeltaAppsCache = a.GetDeltaApps() - } else { - a.deltaExpireTimesMilli = curTimeMs - } - return oldDeltaAppsCache -} - -func diffApplication(oldApplication *Application, newApplication *Application) *Application { +func diffApplicationInstances(curTimeSec int64, oldApplication *Application, newApplication *Application) []*Lease { + var out []*Lease oldRevision := oldApplication.Revision newRevision := newApplication.Revision if len(oldRevision) > 0 && len(newRevision) > 0 && oldRevision == newRevision { // 完全相同,没有变更 - return nil - } - diffApplication := &Application{ - Name: newApplication.Name, + return out } // 获取新增和修改 newInstances := newApplication.Instance @@ -203,7 +220,7 @@ func diffApplication(oldApplication *Application, newApplication *Application) * oldInstance := oldApplication.GetInstance(instance.InstanceId) if oldInstance == nil { // 新增实例 - diffApplication.Instance = append(diffApplication.Instance, instance) + out = addLease(out, &Lease{instance: instance.Clone(ActionAdded), lastUpdateTimeSec: curTimeSec}) continue } // 比较实际的实例是否发生了变更 @@ -211,7 +228,7 @@ func diffApplication(oldApplication *Application, newApplication *Application) * continue } // 新创建一个instance - diffApplication.Instance = append(diffApplication.Instance, instance.Clone(ActionModified)) + out = addLease(out, &Lease{instance: instance.Clone(ActionModified), lastUpdateTimeSec: curTimeSec}) } } // 获取删除 @@ -221,15 +238,102 @@ func diffApplication(oldApplication *Application, newApplication *Application) * newInstance := newApplication.GetInstance(instance.InstanceId) if newInstance == nil { // 被删除了 - // 新创建一个instance - diffApplication.Instance = append(diffApplication.Instance, instance.Clone(ActionDeleted)) + out = addLease(out, &Lease{instance: instance.Clone(ActionDeleted), lastUpdateTimeSec: curTimeSec}) + } + } + } + return out +} + +func addLease(out []*Lease, lease *Lease) []*Lease { + log.Infof("[EUREKA] add delta instance %s(%s)", lease.instance.InstanceId, lease.instance.ActionType) + out = append(out, lease) + return out +} + +func calculateDeltaInstances(oldAppsCache *ApplicationsRespCache, newAppsCache *ApplicationsRespCache) []*Lease { + var out []*Lease + newApps := newAppsCache.AppsResp.Applications + curTimeSec := commontime.CurrentMillisecond() / 1000 + // 1. 处理服务新增场景 + if nil == oldAppsCache { + applications := newApps.Application + for _, app := range applications { + for _, instance := range app.Instance { + out = addLease(out, &Lease{instance: instance.Clone(ActionAdded), lastUpdateTimeSec: curTimeSec}) } } + return out } - if len(diffApplication.Instance) > 0 { - return diffApplication + // 2. 处理服务变更场景 + if oldAppsCache.Revision != newAppsCache.Revision { + oldApps := oldAppsCache.AppsResp.Applications + applications := newApps.Application + for _, application := range applications { + var oldApplication = oldApps.GetApplication(application.Name) + if oldApplication == nil { + // 新增,全部加入 + for _, instance := range application.Instance { + out = addLease(out, &Lease{instance: instance.Clone(ActionAdded), lastUpdateTimeSec: curTimeSec}) + } + continue + } + // 修改,需要比较实例的变更 + leases := diffApplicationInstances(curTimeSec, oldApplication, application) + if len(leases) > 0 { + out = append(out, leases...) + } + } + // 3. 处理服务删除场景 + oldApplications := oldApps.Application + if len(oldApplications) > 0 { + for _, application := range oldApplications { + var newApplication = newApps.GetApplication(application.Name) + if newApplication == nil { + // 删除 + for _, instance := range application.Instance { + out = addLease(out, &Lease{instance: instance.Clone(ActionDeleted), lastUpdateTimeSec: curTimeSec}) + } + } + } + } } - return nil + return out +} + +func (a *ApplicationsWorker) buildDeltaApps( + oldAppsCache *ApplicationsRespCache, newAppsCache *ApplicationsRespCache) *ApplicationsRespCache { + // 1. 清理过期的增量缓存 + a.cleanupExpiredLeases() + // 2. 构建新增的增量缓存 + leases := calculateDeltaInstances(oldAppsCache, newAppsCache) + a.leases = append(a.leases, leases...) + // 3. 创建新的delta对象 + var instCount int + newApps := newAppsCache.AppsResp.Applications + newDeltaApps := &Applications{ + VersionsDelta: newApps.VersionsDelta, + AppsHashCode: newApps.AppsHashCode, + Application: make([]*Application, 0), + ApplicationMap: make(map[string]*Application, 0), + } + // 4. 拷贝lease对象 + for _, lease := range a.leases { + instance := lease.instance + appName := instance.AppName + var app *Application + var ok bool + if app, ok = newDeltaApps.ApplicationMap[appName]; !ok { + app = &Application{ + Name: appName, + } + newDeltaApps.Application = append(newDeltaApps.Application, app) + newDeltaApps.ApplicationMap[appName] = app + } + app.Instance = append(app.Instance, instance) + instCount++ + } + return constructResponseCache(newDeltaApps, instCount, true) } // StartWorker 启动缓存构建器 @@ -248,8 +352,7 @@ func (a *ApplicationsWorker) StartWorker() context.Context { defer waitCancel() apps := a.appBuilder.BuildApplications(nil) a.appsCache.Store(apps) - a.deltaCache.Store(apps) - a.deltaExpireTimesMilli = time.Now().UnixNano() / 1e6 + a.deltaCache.Store(a.buildDeltaApps(nil, apps)) // 开启定时任务构建 var workerCtx context.Context workerCtx, a.workerCancel = context.WithCancel(context.Background()) diff --git a/apiserver/eurekaserver/model.go b/apiserver/eurekaserver/model.go index e3a7fc095..9ef0041f0 100644 --- a/apiserver/eurekaserver/model.go +++ b/apiserver/eurekaserver/model.go @@ -25,7 +25,7 @@ import ( "strconv" "strings" - "github.com/polarismesh/polaris/common/model" + apiservice "github.com/polarismesh/specification/source/go/api/v1/service_manage" ) // PortWrapper 端口包装类 @@ -297,8 +297,8 @@ type InstanceInfo struct { ActionType string `json:"actionType" xml:"actionType"` - // 实际的北极星实例模型, key为revision - RealInstances map[string]*model.Instance `json:"-" xml:"-"` + // 实际的北极星实例模型 + RealInstance *apiservice.Instance `json:"-" xml:"-"` } // Clone 对实例进行拷贝 @@ -333,18 +333,7 @@ func (i *InstanceInfo) Clone(actionType string) *InstanceInfo { // Equals 判断实例是否发生变更 func (i *InstanceInfo) Equals(another *InstanceInfo) bool { - if len(i.RealInstances) != len(another.RealInstances) { - return false - } - if len(i.RealInstances) == 0 { - return true - } - for revision := range i.RealInstances { - if _, ok := another.RealInstances[revision]; !ok { - return false - } - } - return true + return i.RealInstance.GetRevision().GetValue() == another.RealInstance.GetRevision().GetValue() } // Application 服务数据 diff --git a/apiserver/eurekaserver/server.go b/apiserver/eurekaserver/server.go index aea99779a..5795c1523 100644 --- a/apiserver/eurekaserver/server.go +++ b/apiserver/eurekaserver/server.go @@ -156,8 +156,16 @@ func (h *EurekaServer) GetProtocol() string { // Initialize 初始化HTTP API服务器 func (h *EurekaServer) Initialize(ctx context.Context, option map[string]interface{}, api map[string]apiserver.APIConfig) error { - h.listenIP = option[optionListenIP].(string) - h.listenPort = uint32(option[optionListenPort].(int)) + if ipValue, ok := option[optionListenIP]; ok { + h.listenIP = ipValue.(string) + } else { + h.listenIP = DefaultListenIP + } + if portValue, ok := option[optionListenPort]; ok { + h.listenPort = uint32(portValue.(int)) + } else { + h.listenPort = uint32(DefaultListenPort) + } h.option = option h.openAPI = api @@ -198,7 +206,7 @@ func (h *EurekaServer) Initialize(ctx context.Context, option map[string]interfa if value, ok := option[optionRefreshInterval]; ok { refreshInterval = value.(int) } - if refreshInterval < DefaultRefreshInterval { + if refreshInterval == 0 { refreshInterval = DefaultRefreshInterval } @@ -206,7 +214,7 @@ func (h *EurekaServer) Initialize(ctx context.Context, option map[string]interfa if value, ok := option[optionDeltaExpireInterval]; ok { deltaExpireInterval = value.(int) } - if deltaExpireInterval < DefaultDetailExpireInterval { + if deltaExpireInterval == 0 { deltaExpireInterval = DefaultDetailExpireInterval } diff --git a/service/main_test.go b/service/test_common.go similarity index 90% rename from service/main_test.go rename to service/test_common.go index 87d550879..ef3d4fec6 100644 --- a/service/main_test.go +++ b/service/test_common.go @@ -49,7 +49,7 @@ import ( commonlog "github.com/polarismesh/polaris/common/log" "github.com/polarismesh/polaris/common/metrics" "github.com/polarismesh/polaris/common/utils" - "github.com/polarismesh/polaris/namespace" + ns "github.com/polarismesh/polaris/namespace" "github.com/polarismesh/polaris/plugin" _ "github.com/polarismesh/polaris/plugin/cmdb/memory" _ "github.com/polarismesh/polaris/plugin/discoverevent/local" @@ -91,7 +91,7 @@ type Bootstrap struct { type TestConfig struct { Bootstrap Bootstrap `yaml:"bootstrap"` Cache cache.Config `yaml:"cache"` - Namespace namespace.Config `yaml:"namespace"` + Namespace ns.Config `yaml:"namespace"` Naming Config `yaml:"naming"` Config Config `yaml:"config"` HealthChecks healthcheck.Config `yaml:"healthcheck"` @@ -104,7 +104,7 @@ type DiscoverTestSuit struct { cfg *TestConfig server DiscoverServer healthCheckServer *healthcheck.Server - namespaceSvr namespace.NamespaceOperateServer + namespaceSvr ns.NamespaceOperateServer cancelFlag bool updateCacheInterval time.Duration defaultCtx context.Context @@ -112,6 +112,14 @@ type DiscoverTestSuit struct { storage store.Store } +func (d *DiscoverTestSuit) DiscoverServer() DiscoverServer { + return d.server +} + +func (d *DiscoverTestSuit) HealthCheckServer() *healthcheck.Server { + return d.healthCheckServer +} + // 加载配置 func (d *DiscoverTestSuit) loadConfig() error { @@ -149,6 +157,10 @@ func respSuccess(resp api.ResponseMessage) bool { type options func(cfg *TestConfig) +func (d *DiscoverTestSuit) Initialize(opts ...options) error { + return d.initialize(opts...) +} + // 内部初始化函数 func (d *DiscoverTestSuit) initialize(opts ...options) error { // 初始化defaultCtx @@ -205,7 +217,7 @@ func (d *DiscoverTestSuit) initialize(opts ...options) error { } // 初始化命名空间模块 - namespaceSvr, err := namespace.TestInitialize(ctx, &d.cfg.Namespace, s, cacheMgn, authSvr) + namespaceSvr, err := ns.TestInitialize(ctx, &d.cfg.Namespace, s, cacheMgn, authSvr) if err != nil { panic(err) } @@ -274,7 +286,7 @@ func (d *DiscoverTestSuit) Destroy() { d.cancel() time.Sleep(5 * time.Second) - d.storage.Destroy() + _ = d.storage.Destroy() time.Sleep(5 * time.Second) } @@ -287,7 +299,7 @@ func (d *DiscoverTestSuit) cleanReportClient() { } dbTx := tx.GetDelegateTx().(*sqldb.BaseTx) - defer dbTx.Rollback() + defer rollbackDbTx(dbTx) if _, err := dbTx.Exec("delete from client"); err != nil { panic(err) @@ -296,7 +308,7 @@ func (d *DiscoverTestSuit) cleanReportClient() { panic(err) } - dbTx.Commit() + commitDbTx(dbTx) }() } else if d.storage.Name() == boltdb.STORENAME { func() { @@ -306,20 +318,44 @@ func (d *DiscoverTestSuit) cleanReportClient() { } dbTx := tx.GetDelegateTx().(*bolt.Tx) - defer dbTx.Rollback() + defer rollbackBoltTx(dbTx) if err := dbTx.DeleteBucket([]byte(tblClient)); err != nil { if !errors.Is(err, bolt.ErrBucketNotFound) { - dbTx.Rollback() + rollbackBoltTx(dbTx) panic(err) } } - dbTx.Commit() + commitBoltTx(dbTx) }() } } +func rollbackDbTx(dbTx *sqldb.BaseTx) { + if err := dbTx.Rollback(); err != nil { + log.Errorf("fail to rollback db tx, err %v", err) + } +} + +func commitDbTx(dbTx *sqldb.BaseTx) { + if err := dbTx.Commit(); err != nil { + log.Errorf("fail to commit db tx, err %v", err) + } +} + +func rollbackBoltTx(tx *bolt.Tx) { + if err := tx.Rollback(); err != nil { + log.Errorf("fail to rollback bolt tx, err %v", err) + } +} + +func commitBoltTx(tx *bolt.Tx) { + if err := tx.Commit(); err != nil { + log.Errorf("fail to commit bolt tx, err %v", err) + } +} + // 从数据库彻底删除命名空间 func (d *DiscoverTestSuit) cleanNamespace(name string) { if name == "" { @@ -337,13 +373,13 @@ func (d *DiscoverTestSuit) cleanNamespace(name string) { } dbTx := tx.GetDelegateTx().(*sqldb.BaseTx) - defer dbTx.Rollback() + defer rollbackDbTx(dbTx) if _, err := dbTx.Exec(str, name); err != nil { panic(err) } - dbTx.Commit() + commitDbTx(dbTx) }() } else if d.storage.Name() == boltdb.STORENAME { func() { @@ -355,12 +391,12 @@ func (d *DiscoverTestSuit) cleanNamespace(name string) { dbTx := tx.GetDelegateTx().(*bolt.Tx) if err := dbTx.Bucket([]byte(tblNameNamespace)).DeleteBucket([]byte(name)); err != nil { if !errors.Is(err, bolt.ErrBucketNotFound) { - dbTx.Rollback() + rollbackBoltTx(dbTx) panic(err) } } - dbTx.Commit() + commitBoltTx(dbTx) }() } } @@ -377,24 +413,24 @@ func (d *DiscoverTestSuit) cleanAllService() { dbTx := tx.GetDelegateTx().(*sqldb.BaseTx) - defer dbTx.Rollback() + defer rollbackDbTx(dbTx) if _, err := dbTx.Exec("delete from service_metadata"); err != nil { - dbTx.Rollback() + rollbackDbTx(dbTx) panic(err) } if _, err := dbTx.Exec("delete from service"); err != nil { - dbTx.Rollback() + rollbackDbTx(dbTx) panic(err) } if _, err := dbTx.Exec("delete from owner_service_map"); err != nil { - dbTx.Rollback() + rollbackDbTx(dbTx) panic(err) } - dbTx.Commit() + commitDbTx(dbTx) }() } else if d.storage.Name() == boltdb.STORENAME { func() { @@ -404,16 +440,16 @@ func (d *DiscoverTestSuit) cleanAllService() { } dbTx := tx.GetDelegateTx().(*bolt.Tx) - defer dbTx.Rollback() + defer rollbackBoltTx(dbTx) if err := dbTx.DeleteBucket([]byte(tblNameService)); err != nil { if !errors.Is(err, bolt.ErrBucketNotFound) { - dbTx.Rollback() + rollbackBoltTx(dbTx) panic(err) } } - dbTx.Commit() + commitBoltTx(dbTx) }() } } @@ -430,7 +466,7 @@ func (d *DiscoverTestSuit) cleanService(name, namespace string) { dbTx := tx.GetDelegateTx().(*sqldb.BaseTx) - defer dbTx.Rollback() + defer rollbackDbTx(dbTx) str := "select id from service where name = ? and namespace = ?" var id string @@ -443,21 +479,22 @@ func (d *DiscoverTestSuit) cleanService(name, namespace string) { } if _, err := dbTx.Exec("delete from service_metadata where id = ?", id); err != nil { - dbTx.Rollback() + rollbackDbTx(dbTx) panic(err) } if _, err := dbTx.Exec("delete from service where id = ?", id); err != nil { - dbTx.Rollback() + rollbackDbTx(dbTx) panic(err) } - if _, err := dbTx.Exec("delete from owner_service_map where service=? and namespace=?", name, namespace); err != nil { - dbTx.Rollback() + if _, err := dbTx.Exec( + "delete from owner_service_map where service=? and namespace=?", name, namespace); err != nil { + rollbackDbTx(dbTx) panic(err) } - dbTx.Commit() + commitDbTx(dbTx) }() } else if d.storage.Name() == boltdb.STORENAME { func() { @@ -475,16 +512,16 @@ func (d *DiscoverTestSuit) cleanService(name, namespace string) { } dbTx := tx.GetDelegateTx().(*bolt.Tx) - defer dbTx.Rollback() + defer rollbackBoltTx(dbTx) if err := dbTx.Bucket([]byte(tblNameService)).DeleteBucket([]byte(svc.ID)); err != nil { if !errors.Is(err, bolt.ErrBucketNotFound) { - dbTx.Rollback() + rollbackBoltTx(dbTx) panic(err) } } - dbTx.Commit() + commitBoltTx(dbTx) }() } } @@ -507,20 +544,22 @@ func (d *DiscoverTestSuit) cleanServices(services []*apiservice.Service) { dbTx := tx.GetDelegateTx().(*sqldb.BaseTx) - defer dbTx.Rollback() + defer rollbackDbTx(dbTx) str := "delete from service where name = ? and namespace = ?" cleanOwnerSql := "delete from owner_service_map where service=? and namespace=?" for _, service := range services { - if _, err := dbTx.Exec(str, service.GetName().GetValue(), service.GetNamespace().GetValue()); err != nil { + if _, err := dbTx.Exec( + str, service.GetName().GetValue(), service.GetNamespace().GetValue()); err != nil { panic(err) } - if _, err := dbTx.Exec(cleanOwnerSql, service.GetName().GetValue(), service.GetNamespace().GetValue()); err != nil { + if _, err := dbTx.Exec( + cleanOwnerSql, service.GetName().GetValue(), service.GetNamespace().GetValue()); err != nil { panic(err) } } - dbTx.Commit() + commitDbTx(dbTx) }() } else if d.storage.Name() == boltdb.STORENAME { func() { @@ -545,12 +584,12 @@ func (d *DiscoverTestSuit) cleanServices(services []*apiservice.Service) { for i := range ids { if err := dbTx.Bucket([]byte(tblNameService)).DeleteBucket([]byte(ids[i])); err != nil { if !errors.Is(err, bolt.ErrBucketNotFound) { - dbTx.Rollback() + rollbackBoltTx(dbTx) panic(err) } } } - dbTx.Commit() + commitBoltTx(dbTx) }() } @@ -572,15 +611,15 @@ func (d *DiscoverTestSuit) cleanInstance(instanceID string) { dbTx := tx.GetDelegateTx().(*sqldb.BaseTx) - defer dbTx.Rollback() + defer rollbackDbTx(dbTx) str := "delete from instance where id = ?" if _, err := dbTx.Exec(str, instanceID); err != nil { - dbTx.Rollback() + rollbackDbTx(dbTx) panic(err) } - dbTx.Commit() + commitDbTx(dbTx) }() } else if d.storage.Name() == boltdb.STORENAME { func() { @@ -593,11 +632,11 @@ func (d *DiscoverTestSuit) cleanInstance(instanceID string) { if err := dbTx.Bucket([]byte(tblNameInstance)).DeleteBucket([]byte(instanceID)); err != nil { if !errors.Is(err, bolt.ErrBucketNotFound) { - dbTx.Rollback() + rollbackBoltTx(dbTx) panic(err) } } - dbTx.Commit() + commitBoltTx(dbTx) }() } @@ -706,7 +745,8 @@ func (d *DiscoverTestSuit) createCommonInstance(t *testing.T, svc *apiservice.Se } // repeated - InstanceID, _ := utils.CalculateInstanceID(instanceReq.GetNamespace().GetValue(), instanceReq.GetService().GetValue(), + InstanceID, _ := utils.CalculateInstanceID( + instanceReq.GetNamespace().GetValue(), instanceReq.GetService().GetValue(), instanceReq.GetVpcId().GetValue(), instanceReq.GetHost().GetValue(), instanceReq.GetPort().GetValue()) d.cleanInstance(InstanceID) t.Logf("repeatd create instance(%s)", InstanceID) @@ -778,7 +818,8 @@ func (d *DiscoverTestSuit) removeCommonInstance(t *testing.T, service *apiservic } // 通过四元组或者五元组删除实例 -func (d *DiscoverTestSuit) removeInstanceWithAttrs(t *testing.T, service *apiservice.Service, instance *apiservice.Instance) { +func (d *DiscoverTestSuit) removeInstanceWithAttrs( + t *testing.T, service *apiservice.Service, instance *apiservice.Instance) { req := &apiservice.Instance{ ServiceToken: utils.NewStringValue(service.GetToken().GetValue()), Service: utils.NewStringValue(service.GetName().GetValue()), @@ -793,7 +834,8 @@ func (d *DiscoverTestSuit) removeInstanceWithAttrs(t *testing.T, service *apiser } // 创建一个路由配置 -func (d *DiscoverTestSuit) createCommonRoutingConfig(t *testing.T, service *apiservice.Service, inCount int, outCount int) (*apitraffic.Routing, *apitraffic.Routing) { +func (d *DiscoverTestSuit) createCommonRoutingConfig( + t *testing.T, service *apiservice.Service, inCount int, outCount int) (*apitraffic.Routing, *apitraffic.Routing) { inBounds := make([]*apitraffic.Route, 0, inCount) for i := 0; i < inCount; i++ { matchString := &apimodel.MatchString{ @@ -995,7 +1037,8 @@ func (d *DiscoverTestSuit) createCommonRoutingConfigV2(t *testing.T, cnt int32) } // 创建一个路由配置 -func (d *DiscoverTestSuit) createCommonRoutingConfigV2WithReq(t *testing.T, rules []*apitraffic.RouteRule) []*apitraffic.RouteRule { +func (d *DiscoverTestSuit) createCommonRoutingConfigV2WithReq( + t *testing.T, rules []*apitraffic.RouteRule) []*apitraffic.RouteRule { resp := d.server.CreateRoutingConfigsV2(d.defaultCtx, rules) if !respSuccess(resp) { t.Fatalf("error: %+v", resp) @@ -1057,7 +1100,7 @@ func (d *DiscoverTestSuit) cleanCommonRoutingConfig(service string, namespace st dbTx := tx.GetDelegateTx().(*sqldb.BaseTx) - defer dbTx.Rollback() + defer rollbackDbTx(dbTx) str := "delete from routing_config where id in (select id from service where name = ? and namespace = ?)" // fmt.Printf("%s %s %s\n", str, service, namespace) @@ -1070,7 +1113,7 @@ func (d *DiscoverTestSuit) cleanCommonRoutingConfig(service string, namespace st panic(err) } - dbTx.Commit() + commitDbTx(dbTx) }() } else if d.storage.Name() == boltdb.STORENAME { func() { @@ -1089,13 +1132,13 @@ func (d *DiscoverTestSuit) cleanCommonRoutingConfig(service string, namespace st } dbTx := tx.GetDelegateTx().(*bolt.Tx) - defer dbTx.Rollback() + defer rollbackBoltTx(dbTx) v1Bucket := dbTx.Bucket([]byte(tblNameRouting)) if v1Bucket != nil { if err := v1Bucket.DeleteBucket([]byte(svc.ID)); err != nil { if !errors.Is(err, bolt.ErrBucketNotFound) { - dbTx.Rollback() + rollbackBoltTx(dbTx) panic(err) } } @@ -1103,11 +1146,11 @@ func (d *DiscoverTestSuit) cleanCommonRoutingConfig(service string, namespace st if err := dbTx.DeleteBucket([]byte(tblNameRoutingV2)); err != nil { if !errors.Is(err, bolt.ErrBucketNotFound) { - dbTx.Rollback() + rollbackBoltTx(dbTx) panic(err) } } - dbTx.Commit() + commitBoltTx(dbTx) }() } } @@ -1121,14 +1164,14 @@ func (d *DiscoverTestSuit) truncateCommonRoutingConfigV2() { } dbTx := tx.GetDelegateTx().(*sqldb.BaseTx) - defer dbTx.Rollback() + defer rollbackDbTx(dbTx) str := "delete from routing_config_v2" if _, err := dbTx.Exec(str); err != nil { panic(err) } - dbTx.Commit() + commitDbTx(dbTx) }() } else if d.storage.Name() == boltdb.STORENAME { func() { @@ -1139,16 +1182,16 @@ func (d *DiscoverTestSuit) truncateCommonRoutingConfigV2() { } dbTx := tx.GetDelegateTx().(*bolt.Tx) - defer dbTx.Rollback() + defer rollbackBoltTx(dbTx) if err := dbTx.DeleteBucket([]byte(tblNameRoutingV2)); err != nil { if !errors.Is(err, bolt.ErrBucketNotFound) { - dbTx.Rollback() + rollbackBoltTx(dbTx) panic(err) } } - dbTx.Commit() + commitBoltTx(dbTx) }() } } @@ -1164,7 +1207,7 @@ func (d *DiscoverTestSuit) cleanCommonRoutingConfigV2(rules []*apitraffic.RouteR } dbTx := tx.GetDelegateTx().(*sqldb.BaseTx) - defer dbTx.Rollback() + defer rollbackDbTx(dbTx) str := "delete from routing_config_v2 where id in (%s)" @@ -1181,7 +1224,7 @@ func (d *DiscoverTestSuit) cleanCommonRoutingConfigV2(rules []*apitraffic.RouteR panic(err) } - dbTx.Commit() + commitDbTx(dbTx) }() } else if d.storage.Name() == boltdb.STORENAME { func() { @@ -1192,23 +1235,24 @@ func (d *DiscoverTestSuit) cleanCommonRoutingConfigV2(rules []*apitraffic.RouteR } dbTx := tx.GetDelegateTx().(*bolt.Tx) - defer dbTx.Rollback() + defer rollbackBoltTx(dbTx) for i := range rules { if err := dbTx.Bucket([]byte(tblNameRoutingV2)).DeleteBucket([]byte(rules[i].Id)); err != nil { if !errors.Is(err, bolt.ErrBucketNotFound) { - dbTx.Rollback() + rollbackBoltTx(dbTx) panic(err) } } } - dbTx.Commit() + commitBoltTx(dbTx) }() } } -func (d *DiscoverTestSuit) CheckGetService(t *testing.T, expectReqs []*apiservice.Service, actualReqs []*apiservice.Service) { +func (d *DiscoverTestSuit) CheckGetService( + t *testing.T, expectReqs []*apiservice.Service, actualReqs []*apiservice.Service) { if len(expectReqs) != len(actualReqs) { t.Fatalf("error: %d %d", len(expectReqs), len(actualReqs)) } @@ -1363,7 +1407,8 @@ func serviceCheck(t *testing.T, expect *apiservice.Service, actual *apiservice.S } // 创建限流规则 -func (d *DiscoverTestSuit) createCommonRateLimit(t *testing.T, service *apiservice.Service, index int) (*apitraffic.Rule, *apitraffic.Rule) { +func (d *DiscoverTestSuit) createCommonRateLimit( + t *testing.T, service *apiservice.Service, index int) (*apitraffic.Rule, *apitraffic.Rule) { // 先不考虑Cluster rateLimit := &apitraffic.Rule{ Name: &wrappers.StringValue{Value: fmt.Sprintf("rule_name_%d", index)}, @@ -1443,14 +1488,14 @@ func (d *DiscoverTestSuit) cleanRateLimit(id string) { dbTx := tx.GetDelegateTx().(*sqldb.BaseTx) - defer dbTx.Rollback() + defer rollbackDbTx(dbTx) str := `delete from ratelimit_config where id = ?` if _, err := dbTx.Exec(str, id); err != nil { panic(err) } - dbTx.Commit() + commitDbTx(dbTx) }() } else if d.storage.Name() == boltdb.STORENAME { func() { @@ -1463,11 +1508,11 @@ func (d *DiscoverTestSuit) cleanRateLimit(id string) { if err := dbTx.Bucket([]byte(tblRateLimitConfig)).DeleteBucket([]byte(id)); err != nil { if !errors.Is(err, bolt.ErrBucketNotFound) { - dbTx.Rollback() + rollbackBoltTx(dbTx) panic(err) } } - dbTx.Commit() + commitBoltTx(dbTx) }() } } @@ -1484,14 +1529,15 @@ func (d *DiscoverTestSuit) cleanRateLimitRevision(service, namespace string) { dbTx := tx.GetDelegateTx().(*sqldb.BaseTx) - defer dbTx.Rollback() + defer rollbackDbTx(dbTx) - str := `delete from ratelimit_revision using ratelimit_revision, service where service_id = service.id and name = ? and namespace = ?` + str := "delete from ratelimit_revision using ratelimit_revision, service " + + "where service_id = service.id and name = ? and namespace = ?" if _, err := dbTx.Exec(str, service, namespace); err != nil { panic(err) } - dbTx.Commit() + commitDbTx(dbTx) }() } else if d.storage.Name() == boltdb.STORENAME { func() { @@ -1514,11 +1560,11 @@ func (d *DiscoverTestSuit) cleanRateLimitRevision(service, namespace string) { if err := dbTx.Bucket([]byte(tblRateLimitRevision)).DeleteBucket([]byte(svc.ID)); err != nil { if !errors.Is(err, bolt.ErrBucketNotFound) { - dbTx.Rollback() + rollbackBoltTx(dbTx) panic(err) } } - dbTx.Commit() + commitBoltTx(dbTx) }() } } @@ -1564,19 +1610,25 @@ func checkRateLimit(t *testing.T, expect *apitraffic.Rule, actual *apitraffic.Ru case expect.GetId().GetValue() != actual.GetId().GetValue(): t.Fatalf("error id, expect %s, actual %s", expect.GetId().GetValue(), actual.GetId().GetValue()) case expect.GetService().GetValue() != actual.GetService().GetValue(): - t.Fatalf("error service, expect %s, actual %s", expect.GetService().GetValue(), actual.GetService().GetValue()) + t.Fatalf( + "error service, expect %s, actual %s", + expect.GetService().GetValue(), actual.GetService().GetValue()) case expect.GetNamespace().GetValue() != actual.GetNamespace().GetValue(): - t.Fatalf("error namespace, expect %s, actual %s", expect.GetNamespace().GetValue(), actual.GetNamespace().GetValue()) + t.Fatalf("error namespace, expect %s, actual %s", + expect.GetNamespace().GetValue(), actual.GetNamespace().GetValue()) case expect.GetPriority().GetValue() != actual.GetPriority().GetValue(): - t.Fatalf("error priority, expect %v, actual %v", expect.GetPriority().GetValue(), actual.GetPriority().GetValue()) + t.Fatalf("error priority, expect %v, actual %v", + expect.GetPriority().GetValue(), actual.GetPriority().GetValue()) case expect.GetResource() != actual.GetResource(): t.Fatalf("error resource, expect %v, actual %v", expect.GetResource(), actual.GetResource()) case expect.GetType() != actual.GetType(): t.Fatalf("error type, expect %v, actual %v", expect.GetType(), actual.GetType()) case expect.GetDisable().GetValue() != actual.GetDisable().GetValue(): - t.Fatalf("error disable, expect %v, actual %v", expect.GetDisable().GetValue(), actual.GetDisable().GetValue()) + t.Fatalf("error disable, expect %v, actual %v", + expect.GetDisable().GetValue(), actual.GetDisable().GetValue()) case expect.GetAction().GetValue() != actual.GetAction().GetValue(): - t.Fatalf("error action, expect %s, actual %s", expect.GetAction().GetValue(), actual.GetAction().GetValue()) + t.Fatalf("error action, expect %s, actual %s", + expect.GetAction().GetValue(), actual.GetAction().GetValue()) default: break } @@ -1619,7 +1671,8 @@ func checkRateLimit(t *testing.T, expect *apitraffic.Rule, actual *apitraffic.Ru } // 增加熔断规则 -func (d *DiscoverTestSuit) createCommonCircuitBreaker(t *testing.T, id int) (*apifault.CircuitBreaker, *apifault.CircuitBreaker) { +func (d *DiscoverTestSuit) createCommonCircuitBreaker( + t *testing.T, id int) (*apifault.CircuitBreaker, *apifault.CircuitBreaker) { circuitBreaker := &apifault.CircuitBreaker{ Name: utils.NewStringValue(fmt.Sprintf("name-test-%d", id)), Namespace: utils.NewStringValue(DefaultNamespace), @@ -1731,20 +1784,23 @@ func (d *DiscoverTestSuit) createCommonCircuitBreakerVersion(t *testing.T, cb *a // 删除熔断规则 func (d *DiscoverTestSuit) deleteCircuitBreaker(t *testing.T, circuitBreaker *apifault.CircuitBreaker) { - if resp := d.server.DeleteCircuitBreakers(d.defaultCtx, []*apifault.CircuitBreaker{circuitBreaker}); !respSuccess(resp) { + if resp := d.server.DeleteCircuitBreakers( + d.defaultCtx, []*apifault.CircuitBreaker{circuitBreaker}); !respSuccess(resp) { t.Fatalf("%s", resp.GetInfo().GetValue()) } } // 更新熔断规则内容 func (d *DiscoverTestSuit) updateCircuitBreaker(t *testing.T, circuitBreaker *apifault.CircuitBreaker) { - if resp := d.server.UpdateCircuitBreakers(d.defaultCtx, []*apifault.CircuitBreaker{circuitBreaker}); !respSuccess(resp) { + if resp := d.server.UpdateCircuitBreakers( + d.defaultCtx, []*apifault.CircuitBreaker{circuitBreaker}); !respSuccess(resp) { t.Fatalf("%s", resp.GetInfo().GetValue()) } } // 发布熔断规则 -func (d *DiscoverTestSuit) releaseCircuitBreaker(t *testing.T, cb *apifault.CircuitBreaker, service *apiservice.Service) { +func (d *DiscoverTestSuit) releaseCircuitBreaker( + t *testing.T, cb *apifault.CircuitBreaker, service *apiservice.Service) { release := &apiservice.ConfigRelease{ Service: service, CircuitBreaker: cb, @@ -1757,7 +1813,8 @@ func (d *DiscoverTestSuit) releaseCircuitBreaker(t *testing.T, cb *apifault.Circ } // 解绑熔断规则 -func (d *DiscoverTestSuit) unBindCircuitBreaker(t *testing.T, cb *apifault.CircuitBreaker, service *apiservice.Service) { +func (d *DiscoverTestSuit) unBindCircuitBreaker( + t *testing.T, cb *apifault.CircuitBreaker, service *apiservice.Service) { unbind := &apiservice.ConfigRelease{ Service: service, CircuitBreaker: cb, @@ -1770,7 +1827,8 @@ func (d *DiscoverTestSuit) unBindCircuitBreaker(t *testing.T, cb *apifault.Circu } // 对比熔断规则的各个属性 -func checkCircuitBreaker(t *testing.T, expect, expectMaster *apifault.CircuitBreaker, actual *apifault.CircuitBreaker) { +func checkCircuitBreaker( + t *testing.T, expect, expectMaster *apifault.CircuitBreaker, actual *apifault.CircuitBreaker) { switch { case expectMaster.GetId().GetValue() != actual.GetId().GetValue(): t.Fatal("error id") @@ -1834,14 +1892,14 @@ func (d *DiscoverTestSuit) cleanCircuitBreaker(id, version string) { dbTx := tx.GetDelegateTx().(*sqldb.BaseTx) - defer dbTx.Rollback() + defer rollbackDbTx(dbTx) str := `delete from circuitbreaker_rule where id = ? and version = ?` if _, err := dbTx.Exec(str, id, version); err != nil { panic(err) } - dbTx.Commit() + commitDbTx(dbTx) }() } else if d.storage.Name() == boltdb.STORENAME { func() { @@ -1852,13 +1910,14 @@ func (d *DiscoverTestSuit) cleanCircuitBreaker(id, version string) { dbTx := tx.GetDelegateTx().(*bolt.Tx) - if err := dbTx.Bucket([]byte(tblCircuitBreaker)).DeleteBucket([]byte(buildCircuitBreakerKey(id, version))); err != nil { + if err := dbTx.Bucket( + []byte(tblCircuitBreaker)).DeleteBucket([]byte(buildCircuitBreakerKey(id, version))); err != nil { if !errors.Is(err, bolt.ErrBucketNotFound) { panic(err) } } - dbTx.Commit() + commitBoltTx(dbTx) }() } } @@ -1875,14 +1934,15 @@ func (d *DiscoverTestSuit) cleanCircuitBreakerRelation(name, namespace, ruleID, dbTx := tx.GetDelegateTx().(*sqldb.BaseTx) - defer dbTx.Rollback() + defer rollbackDbTx(dbTx) - str := `delete from circuitbreaker_rule_relation using circuitbreaker_rule_relation, service where service_id = service.id and name = ? and namespace = ? and rule_id = ? and rule_version = ?` + str := "delete from circuitbreaker_rule_relation using circuitbreaker_rule_relation, service " + + "where service_id = service.id and name = ? and namespace = ? and rule_id = ? and rule_version = ?" if _, err := dbTx.Exec(str, name, namespace, ruleID, ruleVersion); err != nil { panic(err) } - dbTx.Commit() + commitDbTx(dbTx) }() } else if d.storage.Name() == boltdb.STORENAME { func() { @@ -1898,15 +1958,16 @@ func (d *DiscoverTestSuit) cleanCircuitBreakerRelation(name, namespace, ruleID, dbTx := tx.GetDelegateTx().(*bolt.Tx) for i := range releations { - if err := dbTx.Bucket([]byte(tblCircuitBreakerRelation)).DeleteBucket([]byte(releations[i].ServiceID)); err != nil { + if err := dbTx.Bucket( + []byte(tblCircuitBreakerRelation)).DeleteBucket([]byte(releations[i].ServiceID)); err != nil { if !errors.Is(err, bolt.ErrBucketNotFound) { - tx.Rollback() + rollbackBoltTx(dbTx) panic(err) } } } - dbTx.Commit() + commitBoltTx(dbTx) }() } }