From b099afd5948f73007ef8283e969904d0a655ef5e Mon Sep 17 00:00:00 2001 From: andrewshan Date: Fri, 24 Feb 2023 20:32:42 +0800 Subject: [PATCH 1/8] =?UTF-8?q?fix:=20eureka=20updateStatus=E9=89=B4?= =?UTF-8?q?=E6=9D=83=E5=A4=B1=E8=B4=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apiserver/eurekaserver/write.go | 8 ++-- service/api_v1.go | 3 ++ service/client_v1_authability.go | 17 ++++++++ service/instance.go | 1 + service/instance_test.go | 70 +++++++++++++++++++++++++++++++- 5 files changed, 93 insertions(+), 6 deletions(-) diff --git a/apiserver/eurekaserver/write.go b/apiserver/eurekaserver/write.go index 627f8adb1..806cf6e5b 100644 --- a/apiserver/eurekaserver/write.go +++ b/apiserver/eurekaserver/write.go @@ -217,8 +217,8 @@ func (h *EurekaServer) updateStatus( } ctx = context.WithValue( ctx, model.CtxEventKeyMetadata, map[string]string{MetadataReplicate: strconv.FormatBool(replicated)}) - resp := h.namingServer.UpdateInstances(ctx, - []*apiservice.Instance{{Id: &wrappers.StringValue{Value: instanceId}, Isolate: &wrappers.BoolValue{Value: isolated}}}) + resp := h.namingServer.UpdateInstance(ctx, &apiservice.Instance{ + Id: &wrappers.StringValue{Value: instanceId}, Isolate: &wrappers.BoolValue{Value: isolated}}) return resp.GetCode().GetValue() } @@ -237,7 +237,7 @@ func (h *EurekaServer) renew(ctx context.Context, appId string, instanceId strin } func (h *EurekaServer) updateMetadata(ctx context.Context, instanceId string, metadata map[string]string) uint32 { - resp := h.namingServer.UpdateInstances(ctx, - []*apiservice.Instance{{Id: &wrappers.StringValue{Value: instanceId}, Metadata: metadata}}) + resp := h.namingServer.UpdateInstance(ctx, + &apiservice.Instance{Id: &wrappers.StringValue{Value: instanceId}, Metadata: metadata}) return resp.GetCode().GetValue() } diff --git a/service/api_v1.go b/service/api_v1.go index df56ce2b0..a990b790f 100644 --- a/service/api_v1.go +++ b/service/api_v1.go @@ -226,6 +226,9 @@ type ClientServer interface { GetRouterConfigWithCache(ctx context.Context, req *apiservice.Service) *apiservice.DiscoverResponse GetFaultDetectWithCache(ctx context.Context, req *apiservice.Service) *apiservice.DiscoverResponse + + // UpdateInstance update one instance by client + UpdateInstance(ctx context.Context, req *apiservice.Instance) *apiservice.Response } // L5OperateServer L5 related operations diff --git a/service/client_v1_authability.go b/service/client_v1_authability.go index e28dfaa07..3568f5412 100644 --- a/service/client_v1_authability.go +++ b/service/client_v1_authability.go @@ -108,3 +108,20 @@ func (svr *serverAuthAbility) GetRouterConfigWithCache( ctx context.Context, req *apiservice.Service) *apiservice.DiscoverResponse { return svr.targetServer.GetRouterConfigWithCache(ctx, req) } + +// UpdateInstance update single instance +func (svr *serverAuthAbility) UpdateInstance(ctx context.Context, req *apiservice.Instance) *apiservice.Response { + authCtx := svr.collectClientInstanceAuthContext( + ctx, []*apiservice.Instance{req}, model.Modify, "UpdateInstance") + + _, err := svr.authMgn.CheckClientPermission(authCtx) + if err != nil { + resp := api.NewResponseWithMsg(convertToErrCode(err), err.Error()) + return resp + } + + ctx = authCtx.GetRequestContext() + ctx = context.WithValue(ctx, utils.ContextAuthContextKey, authCtx) + + return svr.targetServer.UpdateInstance(ctx, req) +} diff --git a/service/instance.go b/service/instance.go index f63534d35..1c2e43103 100644 --- a/service/instance.go +++ b/service/instance.go @@ -40,6 +40,7 @@ import ( var ( // InstanceFilterAttributes 查询实例支持的过滤字段 InstanceFilterAttributes = map[string]bool{ + "id": true, "service": true, // 服务name "namespace": true, // 服务namespace "host": true, diff --git a/service/instance_test.go b/service/instance_test.go index 7366ff732..ad23e63dc 100644 --- a/service/instance_test.go +++ b/service/instance_test.go @@ -1641,13 +1641,79 @@ func TestInstanceNoNeedUpdate(t *testing.T) { oldHealthCheck := instanceReq.GetHealthCheck() instanceReq.HealthCheck = nil defer func() { instanceReq.HealthCheck = oldHealthCheck }() - So(discoverSuit.server.UpdateInstances(discoverSuit.defaultCtx, []*apiservice.Instance{instanceReq}).GetCode().GetValue(), ShouldEqual, api.NoNeedUpdate) + So(discoverSuit.server.UpdateInstances(discoverSuit.defaultCtx, + []*apiservice.Instance{instanceReq}).GetCode().GetValue(), ShouldEqual, api.NoNeedUpdate) + }) +} + +func TestUpdateInstanceField(t *testing.T) { + discoverSuit := &DiscoverTestSuit{} + if err := discoverSuit.initialize(); err != nil { + t.Fatal(err) + } + defer discoverSuit.Destroy() + + _, serviceResp := discoverSuit.createCommonService(t, 181) + defer discoverSuit.cleanServiceName(serviceResp.GetName().GetValue(), serviceResp.GetNamespace().GetValue()) + + _, instanceResp := discoverSuit.createCommonInstance(t, serviceResp, 181) + defer discoverSuit.cleanInstance(instanceResp.GetId().GetValue()) + instId := instanceResp.GetId().GetValue() + Convey("metadata变更", t, func() { + request := &apiservice.Instance{Id: wrapperspb.String(instId)} + request.Metadata = map[string]string{} + So(discoverSuit.server.UpdateInstance( + discoverSuit.defaultCtx, request).GetCode().GetValue(), ShouldEqual, api.ExecuteSuccess) + + request.Metadata = map[string]string{"123": "456", "789": "abc", "135": "246"} + So(discoverSuit.server.UpdateInstance( + discoverSuit.defaultCtx, request).GetCode().GetValue(), ShouldEqual, api.ExecuteSuccess) + + query := map[string]string{ + "id": instId, + "service": serviceResp.GetName().GetValue(), + "namespace": serviceResp.GetNamespace().GetValue(), + "offset": "0", + "limit": "10", + } + batchResp := discoverSuit.server.GetInstances(discoverSuit.defaultCtx, query) + So(batchResp.GetCode().GetValue(), ShouldEqual, api.ExecuteSuccess) + So(len(batchResp.Instances), ShouldEqual, 1) + So(batchResp.Instances[0].Host.GetValue(), ShouldEqual, instanceResp.Host.GetValue()) + So(len(batchResp.Instances[0].Metadata), ShouldEqual, len(request.Metadata)) }) + + Convey("isolate变更", t, func() { + request := &apiservice.Instance{Id: wrapperspb.String(instId)} + request.Isolate = wrapperspb.Bool(true) + So(discoverSuit.server.UpdateInstance( + discoverSuit.defaultCtx, request).GetCode().GetValue(), ShouldEqual, api.ExecuteSuccess) + query := map[string]string{ + "id": instId, + "service": serviceResp.GetName().GetValue(), + "namespace": serviceResp.GetNamespace().GetValue(), + "offset": "0", + "limit": "10", + } + batchResp := discoverSuit.server.GetInstances(discoverSuit.defaultCtx, query) + So(batchResp.GetCode().GetValue(), ShouldEqual, api.ExecuteSuccess) + So(len(batchResp.Instances), ShouldEqual, 1) + So(batchResp.Instances[0].Isolate.GetValue(), ShouldEqual, true) + + request.Isolate = wrapperspb.Bool(false) + So(discoverSuit.server.UpdateInstance( + discoverSuit.defaultCtx, request).GetCode().GetValue(), ShouldEqual, api.ExecuteSuccess) + batchResp = discoverSuit.server.GetInstances(discoverSuit.defaultCtx, query) + So(batchResp.GetCode().GetValue(), ShouldEqual, api.ExecuteSuccess) + So(len(batchResp.Instances), ShouldEqual, 1) + So(batchResp.Instances[0].Isolate.GetValue(), ShouldEqual, false) + }) + } // 实例数据更新测试 // 部分数据变更,触发更新 -func TestUpdateInstanceFiled(t *testing.T) { +func TestUpdateInstancesFiled(t *testing.T) { discoverSuit := &DiscoverTestSuit{} if err := discoverSuit.initialize(); err != nil { From c29178d19e5cc816720a20acc806b98d4ef264e2 Mon Sep 17 00:00:00 2001 From: andrewshan Date: Sat, 25 Feb 2023 00:25:24 +0800 Subject: [PATCH 2/8] =?UTF-8?q?fix=EF=BC=9Adb=20integrate=20test=20run=20f?= =?UTF-8?q?ailed?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- service/instance.go | 1 - service/instance_test.go | 37 ++++++++++--------------------------- 2 files changed, 10 insertions(+), 28 deletions(-) diff --git a/service/instance.go b/service/instance.go index 1c2e43103..f63534d35 100644 --- a/service/instance.go +++ b/service/instance.go @@ -40,7 +40,6 @@ import ( var ( // InstanceFilterAttributes 查询实例支持的过滤字段 InstanceFilterAttributes = map[string]bool{ - "id": true, "service": true, // 服务name "namespace": true, // 服务namespace "host": true, diff --git a/service/instance_test.go b/service/instance_test.go index ad23e63dc..30cc4063f 100644 --- a/service/instance_test.go +++ b/service/instance_test.go @@ -1669,18 +1669,9 @@ func TestUpdateInstanceField(t *testing.T) { So(discoverSuit.server.UpdateInstance( discoverSuit.defaultCtx, request).GetCode().GetValue(), ShouldEqual, api.ExecuteSuccess) - query := map[string]string{ - "id": instId, - "service": serviceResp.GetName().GetValue(), - "namespace": serviceResp.GetNamespace().GetValue(), - "offset": "0", - "limit": "10", - } - batchResp := discoverSuit.server.GetInstances(discoverSuit.defaultCtx, query) - So(batchResp.GetCode().GetValue(), ShouldEqual, api.ExecuteSuccess) - So(len(batchResp.Instances), ShouldEqual, 1) - So(batchResp.Instances[0].Host.GetValue(), ShouldEqual, instanceResp.Host.GetValue()) - So(len(batchResp.Instances[0].Metadata), ShouldEqual, len(request.Metadata)) + instance, err := discoverSuit.storage.GetInstance(instId) + So(err, ShouldBeNil) + So(instance.Proto.Host.GetValue(), ShouldEqual, instanceResp.Host.GetValue()) }) Convey("isolate变更", t, func() { @@ -1688,25 +1679,17 @@ func TestUpdateInstanceField(t *testing.T) { request.Isolate = wrapperspb.Bool(true) So(discoverSuit.server.UpdateInstance( discoverSuit.defaultCtx, request).GetCode().GetValue(), ShouldEqual, api.ExecuteSuccess) - query := map[string]string{ - "id": instId, - "service": serviceResp.GetName().GetValue(), - "namespace": serviceResp.GetNamespace().GetValue(), - "offset": "0", - "limit": "10", - } - batchResp := discoverSuit.server.GetInstances(discoverSuit.defaultCtx, query) - So(batchResp.GetCode().GetValue(), ShouldEqual, api.ExecuteSuccess) - So(len(batchResp.Instances), ShouldEqual, 1) - So(batchResp.Instances[0].Isolate.GetValue(), ShouldEqual, true) + instance, err := discoverSuit.storage.GetInstance(instId) + So(err, ShouldBeNil) + So(instance.Proto.Isolate.GetValue(), ShouldEqual, true) request.Isolate = wrapperspb.Bool(false) So(discoverSuit.server.UpdateInstance( discoverSuit.defaultCtx, request).GetCode().GetValue(), ShouldEqual, api.ExecuteSuccess) - batchResp = discoverSuit.server.GetInstances(discoverSuit.defaultCtx, query) - So(batchResp.GetCode().GetValue(), ShouldEqual, api.ExecuteSuccess) - So(len(batchResp.Instances), ShouldEqual, 1) - So(batchResp.Instances[0].Isolate.GetValue(), ShouldEqual, false) + + instance, err = discoverSuit.storage.GetInstance(instId) + So(err, ShouldBeNil) + So(instance.Proto.Isolate.GetValue(), ShouldEqual, false) }) } From 3258783ab8c2848a2fe4ec6081b94a20b47cf180 Mon Sep 17 00:00:00 2001 From: andrewshan Date: Sat, 25 Feb 2023 18:09:02 +0800 Subject: [PATCH 3/8] fix: database deadlock issues --- store/mysql/service.go | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/store/mysql/service.go b/store/mysql/service.go index d60334a79..5d8af6cfe 100644 --- a/store/mysql/service.go +++ b/store/mysql/service.go @@ -41,11 +41,6 @@ func (ss *serviceStore) AddService(s *model.Service) error { "add service missing some params, id is %s, name is %s, namespace is %s", s.ID, s.Name, s.Namespace)) } - // 先清理无效数据 - if err := ss.cleanService(s.Name, s.Namespace); err != nil { - return err - } - err := RetryTransaction("addService", func() error { return ss.addService(s) }) @@ -73,6 +68,11 @@ func (ss *serviceStore) addService(s *model.Service) error { return store.NewStatusError(store.NotFoundNamespace, "not found namespace") } + // 先清理无效数据 + if err := ss.cleanService(s.Name, s.Namespace); err != nil { + return err + } + // 填充main表 if err := addServiceMain(tx, s); err != nil { log.Errorf("[Store][database] add service table err: %s", err.Error()) @@ -725,8 +725,7 @@ func (ss *serviceStore) getServiceByID(serviceID string) (*model.Service, error) return out[0], nil } -// cleanService 清理无效数据,flag=1的数据 -// 只需要删除service即可 +// cleanService 清理无效数据,flag=1的数据,只需要删除service即可 func (ss *serviceStore) cleanService(name string, namespace string) error { log.Infof("[Store][database] clean service(%s, %s)", name, namespace) str := "delete from service where name = ? and namespace = ? and flag = 1" From 2fc05d925392f830a60cbab92edb9114cca630f7 Mon Sep 17 00:00:00 2001 From: andrewshan Date: Sun, 26 Feb 2023 17:12:09 +0800 Subject: [PATCH 4/8] fix: all write db operation should be in transaction --- store/mysql/circuitbreaker_config.go | 196 ++++++++++++++++---------- store/mysql/client.go | 12 +- store/mysql/default.go | 6 +- store/mysql/fault_detect_config.go | 2 +- store/mysql/group.go | 10 +- store/mysql/instance.go | 199 ++++++++++++++++----------- store/mysql/l5.go | 13 +- store/mysql/maintain.go | 135 +++++++++++------- store/mysql/namespace.go | 83 +++++++---- store/mysql/ratelimit_config.go | 38 ++--- store/mysql/routing_config.go | 87 +++++++----- store/mysql/service.go | 45 +++--- 12 files changed, 510 insertions(+), 316 deletions(-) diff --git a/store/mysql/circuitbreaker_config.go b/store/mysql/circuitbreaker_config.go index 5a7f5c729..df31639c4 100644 --- a/store/mysql/circuitbreaker_config.go +++ b/store/mysql/circuitbreaker_config.go @@ -26,6 +26,16 @@ import ( "github.com/polarismesh/polaris/store" ) +const ( + labelCreateCircuitBreakerRuleOld = "createCircuitBreakerRuleOld" + labelTagCircuitBreakerRuleOld = "tagCircuitBreakerRuleOld" + labelDeleteTagCircuitBreakerRuleOld = "deleteTagCircuitBreakerRuleOld" + labelReleaseCircuitBreakerRuleOld = "releaseCircuitBreakerRuleOld" + labelUnbindCircuitBreakerRuleOld = "unbindCircuitBreakerRuleOld" + labelUpdateCircuitBreakerRuleOld = "updateCircuitBreakerRuleOld" + labelDeleteCircuitBreakerRuleOld = "deleteCircuitBreakerRuleOld" +) + // circuitBreakerStore 的实现 type circuitBreakerStore struct { master *BaseDB @@ -34,45 +44,57 @@ type circuitBreakerStore struct { // CreateCircuitBreaker 创建一个新的熔断规则 func (c *circuitBreakerStore) CreateCircuitBreaker(cb *model.CircuitBreaker) error { - if err := c.cleanCircuitBreaker(cb.ID, cb.Version); err != nil { - log.Errorf("[Store][circuitBreaker] clean master for circuit breaker(%s, %s) err: %s", - cb.ID, cb.Version, err.Error()) - return store.Error(err) - } + return c.master.processWithTransaction(labelCreateCircuitBreakerRuleOld, func(tx *BaseTx) error { + if err := cleanCircuitBreaker(tx, cb.ID, cb.Version); err != nil { + log.Errorf("[Store][circuitBreaker] clean master for circuit breaker(%s, %s) err: %s", + cb.ID, cb.Version, err.Error()) + return store.Error(err) + } - str := `insert into circuitbreaker_rule + str := `insert into circuitbreaker_rule (id, version, name, namespace, business, department, comment, inbounds, outbounds, token, owner, revision, flag, ctime, mtime) values(?,?,?,?,?,?,?,?,?,?,?,?,?,sysdate(),sysdate())` - if _, err := c.master.Exec(str, cb.ID, cb.Version, cb.Name, cb.Namespace, cb.Business, cb.Department, - cb.Comment, cb.Inbounds, cb.Outbounds, cb.Token, cb.Owner, cb.Revision, 0); err != nil { - log.Errorf("[Store][circuitBreaker] create circuit breaker(%s, %s, %s) err: %s", - cb.ID, cb.Name, cb.Version, err.Error()) - return store.Error(err) - } - - return nil + if _, err := tx.Exec(str, cb.ID, cb.Version, cb.Name, cb.Namespace, cb.Business, cb.Department, + cb.Comment, cb.Inbounds, cb.Outbounds, cb.Token, cb.Owner, cb.Revision, 0); err != nil { + log.Errorf("[Store][circuitBreaker] create circuit breaker(%s, %s, %s) err: %s", + cb.ID, cb.Name, cb.Version, err.Error()) + return store.Error(err) + } + if err := tx.Commit(); err != nil { + log.Errorf("[Store][database] fail to %s commit tx, create rule(%+v) commit tx err: %s", + labelCreateCircuitBreakerRuleOld, cb, err.Error()) + return err + } + return nil + }) } // TagCircuitBreaker 给master熔断规则打一个version tag func (c *circuitBreakerStore) TagCircuitBreaker(cb *model.CircuitBreaker) error { - if err := c.cleanCircuitBreaker(cb.ID, cb.Version); err != nil { - log.Errorf("[Store][circuitBreaker] clean tag for circuit breaker(%s, %s) err: %s", - cb.ID, cb.Version, err.Error()) - return store.Error(err) - } - - if err := c.tagCircuitBreaker(cb); err != nil { - log.Errorf("[Store][circuitBreaker] create tag for circuit breaker(%s, %s) err: %s", - cb.ID, cb.Version, err.Error()) - return store.Error(err) - } + return c.master.processWithTransaction(labelTagCircuitBreakerRuleOld, func(tx *BaseTx) error { + if err := cleanCircuitBreaker(tx, cb.ID, cb.Version); err != nil { + log.Errorf("[Store][circuitBreaker] clean tag for circuit breaker(%s, %s) err: %s", + cb.ID, cb.Version, err.Error()) + return store.Error(err) + } - return nil + if err := tagCircuitBreaker(tx, cb); err != nil { + log.Errorf("[Store][circuitBreaker] create tag for circuit breaker(%s, %s) err: %s", + cb.ID, cb.Version, err.Error()) + return store.Error(err) + } + if err := tx.Commit(); err != nil { + log.Errorf("[Store][database] fail to %s commit tx, tag rule(%+v) commit tx err: %s", + labelTagCircuitBreakerRuleOld, cb, err.Error()) + return err + } + return nil + }) } // tagCircuitBreaker 给master熔断规则打一个version tag的内部函数 -func (c *circuitBreakerStore) tagCircuitBreaker(cb *model.CircuitBreaker) error { +func tagCircuitBreaker(tx *BaseTx, cb *model.CircuitBreaker) error { // 需要保证master规则存在 str := `insert into circuitbreaker_rule (id, version, name, namespace, business, department, comment, inbounds, @@ -82,7 +104,7 @@ func (c *circuitBreakerStore) tagCircuitBreaker(cb *model.CircuitBreaker) error where id = ? and version = 'master'` str = fmt.Sprintf(str, cb.ID, cb.Version, cb.Name, cb.Namespace, cb.Business, cb.Department, cb.Comment, cb.Inbounds, cb.Outbounds, cb.Token, cb.Owner, cb.Revision) - result, err := c.master.Exec(str, cb.ID) + result, err := tx.Exec(str, cb.ID) if err != nil { log.Errorf("[Store][CircuitBreaker] exec create tag sql(%s) err: %s", str, err.Error()) return err @@ -101,21 +123,28 @@ func (c *circuitBreakerStore) tagCircuitBreaker(cb *model.CircuitBreaker) error // ReleaseCircuitBreaker 发布熔断规则 func (c *circuitBreakerStore) ReleaseCircuitBreaker(cbr *model.CircuitBreakerRelation) error { - if err := c.cleanCircuitBreakerRelation(cbr); err != nil { - return store.Error(err) - } + return c.master.processWithTransaction(labelReleaseCircuitBreakerRuleOld, func(tx *BaseTx) error { + if err := c.cleanCircuitBreakerRelation(cbr); err != nil { + return store.Error(err) + } - if err := c.releaseCircuitBreaker(cbr); err != nil { - log.Errorf("[Store][CircuitBreaker] release rule err: %s", err.Error()) - return store.Error(err) - } + if err := releaseCircuitBreaker(tx, cbr); err != nil { + log.Errorf("[Store][CircuitBreaker] release rule err: %s", err.Error()) + return store.Error(err) + } - return nil + if err := tx.Commit(); err != nil { + log.Errorf("[Store][database] fail to %s commit tx, release rule(%+v) commit tx err: %s", + labelReleaseCircuitBreakerRuleOld, cbr, err.Error()) + return err + } + return nil + }) } // releaseCircuitBreaker 发布熔断规则的内部函数 // @note 可能存在服务的规则,由旧的更新到新的场景 -func (c *circuitBreakerStore) releaseCircuitBreaker(cbr *model.CircuitBreakerRelation) error { +func releaseCircuitBreaker(tx *BaseTx, cbr *model.CircuitBreakerRelation) error { // 发布规则时,需要保证规则已经被标记 str := `insert into circuitbreaker_rule_relation(service_id, rule_id, rule_version, flag, ctime, mtime) select '%s', '%s', '%s', 0, sysdate(), sysdate() from service, circuitbreaker_rule @@ -126,7 +155,7 @@ func (c *circuitBreakerStore) releaseCircuitBreaker(cbr *model.CircuitBreakerRel rule_id = ?, rule_version = ?, flag = 0, mtime = sysdate()` str = fmt.Sprintf(str, cbr.ServiceID, cbr.RuleID, cbr.RuleVersion) log.Infof("[Store][CircuitBreaker] exec release sql(%s)", str) - result, err := c.master.Exec(str, cbr.ServiceID, cbr.RuleID, cbr.RuleVersion, cbr.RuleID, cbr.RuleVersion) + result, err := tx.Exec(str, cbr.ServiceID, cbr.RuleID, cbr.RuleVersion, cbr.RuleID, cbr.RuleVersion) if err != nil { log.Errorf("[Store][CircuitBreaker] release exec sql(%s) err: %s", str, err.Error()) return err @@ -144,64 +173,93 @@ func (c *circuitBreakerStore) releaseCircuitBreaker(cbr *model.CircuitBreakerRel // UnbindCircuitBreaker 解绑熔断规则 func (c *circuitBreakerStore) UnbindCircuitBreaker(serviceID, ruleID, ruleVersion string) error { - str := `update circuitbreaker_rule_relation set flag = 1, mtime = sysdate() where service_id = ? and rule_id = ? + return c.master.processWithTransaction(labelUnbindCircuitBreakerRuleOld, func(tx *BaseTx) error { + str := `update circuitbreaker_rule_relation set flag = 1, mtime = sysdate() where service_id = ? and rule_id = ? and rule_version = ?` - if _, err := c.master.Exec(str, serviceID, ruleID, ruleVersion); err != nil { - log.Errorf("[Store][CircuitBreaker] delete relation(%s) err: %s", serviceID, err.Error()) - return err - } + if _, err := tx.Exec(str, serviceID, ruleID, ruleVersion); err != nil { + log.Errorf("[Store][CircuitBreaker] delete relation(%s) err: %s", serviceID, err.Error()) + return err + } - return nil + if err := tx.Commit(); err != nil { + log.Errorf("[Store][database] fail to %s commit tx, unbind rule(%s) commit tx err: %s", + labelUnbindCircuitBreakerRuleOld, ruleID, err.Error()) + return err + } + + return nil + }) } // DeleteTagCircuitBreaker 删除非master熔断规则 func (c *circuitBreakerStore) DeleteTagCircuitBreaker(id string, version string) error { - // 需要保证规则无绑定服务 - str := `update circuitbreaker_rule set flag = 1, mtime = sysdate() + return c.master.processWithTransaction(labelDeleteTagCircuitBreakerRuleOld, func(tx *BaseTx) error { + // 需要保证规则无绑定服务 + str := `update circuitbreaker_rule set flag = 1, mtime = sysdate() where id = ? and version = ? and id not in (select DISTINCT(rule_id) from circuitbreaker_rule_relation where rule_id = ? and rule_version = ? and flag = 0)` - log.Infof("[Store][circuitBreaker] delete rule id(%s) version(%s), sql(%s)", id, version, str) - if _, err := c.master.Exec(str, id, version, id, version); err != nil { - log.Errorf("[Store][CircuitBreaker] delete tag rule(%s, %s) exec err: %s", id, version, err.Error()) - return err - } + log.Infof("[Store][circuitBreaker] delete rule id(%s) version(%s), sql(%s)", id, version, str) + if _, err := tx.Exec(str, id, version, id, version); err != nil { + log.Errorf("[Store][CircuitBreaker] delete tag rule(%s, %s) exec err: %s", id, version, err.Error()) + return err + } - return nil + if err := tx.Commit(); err != nil { + log.Errorf("[Store][database] fail to %s commit tx, delete tag rule(%s) commit tx err: %s", + labelDeleteTagCircuitBreakerRuleOld, id, err.Error()) + return err + } + return nil + }) } // DeleteMasterCircuitBreaker 删除master熔断规则 func (c *circuitBreakerStore) DeleteMasterCircuitBreaker(id string) error { - // 需要保证所有已标记的规则无绑定服务 - str := `update circuitbreaker_rule set flag = 1, mtime = sysdate() + return c.master.processWithTransaction(labelDeleteCircuitBreakerRuleOld, func(tx *BaseTx) error { + // 需要保证所有已标记的规则无绑定服务 + str := `update circuitbreaker_rule set flag = 1, mtime = sysdate() where id = ? and version = 'master' and id not in (select DISTINCT(rule_id) from circuitbreaker_rule_relation where rule_id = ? and flag = 0)` - log.Infof("[Store][CircuitBreaker] delete master rule(%s) sql(%s)", id, str) - if _, err := c.master.Exec(str, id, id); err != nil { - log.Errorf("[Store][CircuitBreaker] delete master rule(%s) exec err: %s", id, err.Error()) - return err - } + log.Infof("[Store][CircuitBreaker] delete master rule(%s) sql(%s)", id, str) + if _, err := tx.Exec(str, id, id); err != nil { + log.Errorf("[Store][CircuitBreaker] delete master rule(%s) exec err: %s", id, err.Error()) + return err + } - return nil + if err := tx.Commit(); err != nil { + log.Errorf("[Store][database] fail to %s commit tx, delete rule(%s) commit tx err: %s", + labelDeleteCircuitBreakerRuleOld, id, err.Error()) + return err + } + return nil + }) } // UpdateCircuitBreaker 修改熔断规则 // @note 只允许修改master熔断规则 func (c *circuitBreakerStore) UpdateCircuitBreaker(cb *model.CircuitBreaker) error { - str := `update circuitbreaker_rule set business = ?, department = ?, comment = ?, + return c.master.processWithTransaction(labelUpdateCircuitBreakerRuleOld, func(tx *BaseTx) error { + str := `update circuitbreaker_rule set business = ?, department = ?, comment = ?, inbounds = ?, outbounds = ?, token = ?, owner = ?, revision = ?, mtime = sysdate() where id = ? and version = ?` - if _, err := c.master.Exec(str, cb.Business, cb.Department, cb.Comment, cb.Inbounds, - cb.Outbounds, cb.Token, cb.Owner, cb.Revision, cb.ID, cb.Version); err != nil { - log.Errorf("[Store][CircuitBreaker] update rule(%s,%s) exec err: %s", cb.ID, cb.Version, err.Error()) - return err - } + if _, err := tx.Exec(str, cb.Business, cb.Department, cb.Comment, cb.Inbounds, + cb.Outbounds, cb.Token, cb.Owner, cb.Revision, cb.ID, cb.Version); err != nil { + log.Errorf("[Store][CircuitBreaker] update rule(%s,%s) exec err: %s", cb.ID, cb.Version, err.Error()) + return err + } - return nil + if err := tx.Commit(); err != nil { + log.Errorf("[Store][database] fail to %s commit tx, update rule(%+v) commit tx err: %s", + labelUpdateCircuitBreakerRuleOld, cb, err.Error()) + return err + } + return nil + }) } // GetCircuitBreaker 获取熔断规则 @@ -505,9 +563,9 @@ func (c *circuitBreakerStore) cleanCircuitBreakerRelation(cbr *model.CircuitBrea } // cleanCircuitBreaker 彻底清理熔断规则 -func (c *circuitBreakerStore) cleanCircuitBreaker(id string, version string) error { +func cleanCircuitBreaker(tx *BaseTx, id string, version string) error { str := `delete from circuitbreaker_rule where id = ? and version = ? and flag = 1` - if _, err := c.master.Exec(str, id, version); err != nil { + if _, err := tx.Exec(str, id, version); err != nil { log.Errorf("[Store][database] clean circuit breaker(%s) err: %s", id, err.Error()) return store.Error(err) } diff --git a/store/mysql/client.go b/store/mysql/client.go index 89cea2225..6d74bb375 100644 --- a/store/mysql/client.go +++ b/store/mysql/client.go @@ -43,10 +43,6 @@ func (cs *clientStore) CreateClient(client *model.Client) error { return fmt.Errorf("add Business missing some params, id %s, name %s", clientID, client.Proto().GetHost().GetValue()) } - // clean the old items before add - if err := cs.DeleteClient(clientID); err != nil { - return err - } err := RetryTransaction("createClient", func() error { return cs.createClient(client) }) @@ -69,8 +65,8 @@ func (cs *clientStore) UpdateClient(client *model.Client) error { return serr } -// DeleteClient delete the client info -func (cs *clientStore) DeleteClient(clientID string) error { +// deleteClient delete the client info +func (cs *clientStore) deleteClient(clientID string) error { if clientID == "" { return errors.New("delete client missing client id") } @@ -305,6 +301,10 @@ func (cs *clientStore) createClient(client *model.Client) error { return err } defer func() { _ = tx.Rollback() }() + // clean the old items before add + if err := cs.deleteClient(client.Proto().GetId().GetValue()); err != nil { + return err + } if err := addClientMain(tx, client); err != nil { log.Errorf("[Store][database] add client main err: %s", err.Error()) return err diff --git a/store/mysql/default.go b/store/mysql/default.go index 9f8cb531c..4e12fc002 100644 --- a/store/mysql/default.go +++ b/store/mysql/default.go @@ -244,7 +244,7 @@ func (s *stableStore) StartTx() (store.Tx, error) { // newStore 初始化子类 func (s *stableStore) newStore() { - s.namespaceStore = &namespaceStore{db: s.master, slave: s.slave} + s.namespaceStore = &namespaceStore{master: s.master, slave: s.slave} s.serviceStore = &serviceStore{master: s.master, slave: s.slave} @@ -252,9 +252,9 @@ func (s *stableStore) newStore() { s.routingConfigStore = &routingConfigStore{master: s.master, slave: s.slave} - s.l5Store = &l5Store{db: s.master} + s.l5Store = &l5Store{master: s.master, slave: s.slave} - s.rateLimitStore = &rateLimitStore{db: s.master, slave: s.slave} + s.rateLimitStore = &rateLimitStore{master: s.master, slave: s.slave} s.circuitBreakerStore = &circuitBreakerStore{master: s.master, slave: s.slave} diff --git a/store/mysql/fault_detect_config.go b/store/mysql/fault_detect_config.go index 53d7e1257..91d758181 100644 --- a/store/mysql/fault_detect_config.go +++ b/store/mysql/fault_detect_config.go @@ -199,7 +199,7 @@ func (f *faultDetectRuleStore) GetFaultDetectRulesForCache( if firstUpdate { str += " and flag != 1" } - rows, err := f.master.Query(str, timeToTimestamp(mtime)) + rows, err := f.slave.Query(str, timeToTimestamp(mtime)) if err != nil { log.Errorf("[Store][database] query fault detect rules with mtime err: %s", err.Error()) return nil, err diff --git a/store/mysql/group.go b/store/mysql/group.go index 79c65fa79..9daed9092 100644 --- a/store/mysql/group.go +++ b/store/mysql/group.go @@ -63,11 +63,6 @@ func (u *groupStore) AddGroup(group *model.UserGroupDetail) error { "add usergroup missing some params, groupId is %s, name is %s", group.ID, group.Name)) } - // 先清理无效数据 - if err := u.cleanInValidGroup(group.Name, group.Owner); err != nil { - return store.Error(err) - } - err := RetryTransaction("addGroup", func() error { return u.addGroup(group) }) @@ -83,6 +78,11 @@ func (u *groupStore) addGroup(group *model.UserGroupDetail) error { defer func() { _ = tx.Rollback() }() + // 先清理无效数据 + if err := u.cleanInValidGroup(group.Name, group.Owner); err != nil { + return store.Error(err) + } + addSql := ` INSERT INTO user_group (id, name, owner, token, token_enable, comment, flag, ctime, mtime) VALUES (?, ?, ?, ?, ?, ?, ?, sysdate(), sysdate()) diff --git a/store/mysql/instance.go b/store/mysql/instance.go index 2a9a21d4e..afa691548 100644 --- a/store/mysql/instance.go +++ b/store/mysql/instance.go @@ -37,11 +37,6 @@ type instanceStore struct { // AddInstance 添加实例 func (ins *instanceStore) AddInstance(instance *model.Instance) error { - // 新增数据之前,必须先清理老数据 - if err := ins.CleanInstance(instance.ID()); err != nil { - return err - } - err := RetryTransaction("addInstance", func() error { return ins.addInstance(instance) }) @@ -63,6 +58,12 @@ func (ins *instanceStore) addInstance(instance *model.Instance) error { log.Errorf("[Store][database] rlock service(%s) err: %s", instance.ServiceID, err.Error()) return err } + + // 新增数据之前,必须先清理老数据 + if err := cleanInstance(tx, instance.ID()); err != nil { + return err + } + if revision == "" { log.Errorf("[Store][database] not found service(%s)", instance.ServiceID) return store.NewStatusError(store.NotFoundService, "not found service") @@ -92,11 +93,6 @@ func (ins *instanceStore) addInstance(instance *model.Instance) error { // BatchAddInstances 批量增加实例 func (ins *instanceStore) BatchAddInstances(instances []*model.Instance) error { - // 由于已经走了 replace into 逻辑,因为这里不需要在进行额外的数据删除动作 - // if err := ins.BatchClearInstances(instances); err != nil { - // log.Errorf("[Store][database] batch clear instances err: %s", err.Error()) - // return err - // } err := RetryTransaction("batchAddInstances", func() error { return ins.batchAddInstances(instances) @@ -138,37 +134,6 @@ func (ins *instanceStore) batchAddInstances(instances []*model.Instance) error { return nil } -// BatchClearInstances 批量清理实例信息 -// 注意:依赖instance表修改结果,id外键修改为删除级联 -func (ins *instanceStore) BatchClearInstances(instances []*model.Instance) error { - if len(instances) == 0 { - return nil - } - - ids := make([]interface{}, 0, len(instances)) - var paramStr string - first := true - for _, entry := range instances { - if first { - paramStr = "(?" - first = false - } else { - paramStr += ", ?" - } - ids = append(ids, entry.ID()) - } - paramStr += ")" - - log.Infof("[Store][database] clean instance(%+v)", ids) // 先打印日志 - mainStr := "delete from instance where flag = 1 and id in " + paramStr - if _, err := ins.master.Exec(mainStr, ids...); err != nil { - log.Errorf("[Store][database] clean instance main err: %s", err.Error()) - return err - } - - return nil -} - // UpdateInstance 更新实例 func (ins *instanceStore) UpdateInstance(instance *model.Instance) error { err := RetryTransaction("updateInstance", func() error { @@ -219,11 +184,29 @@ func (ins *instanceStore) updateInstance(instance *model.Instance) error { } // CleanInstance 清理数据 -// TODO 后续修改instance表,id外键删除级联,那么可以执行一次delete操作 func (ins *instanceStore) CleanInstance(instanceID string) error { + return RetryTransaction("cleanInstance", func() error { + return ins.master.processWithTransaction("cleanInstance", func(tx *BaseTx) error { + if err := cleanInstance(tx, instanceID); err != nil { + return err + } + + if err := tx.Commit(); err != nil { + log.Errorf("[Store][database] clean instance commit tx err: %s", err.Error()) + return err + } + + return nil + }) + }) +} + +// cleanInstance 清理数据 +// TODO 后续修改instance表,id外键删除级联,那么可以执行一次delete操作 +func cleanInstance(tx *BaseTx, instanceID string) error { log.Infof("[Store][database] clean instance(%s)", instanceID) mainStr := "delete from instance where id = ? and flag = 1" - if _, err := ins.master.Exec(mainStr, instanceID); err != nil { + if _, err := tx.Exec(mainStr, instanceID); err != nil { log.Errorf("[Store][database] clean instance(%s), err: %s", instanceID, err.Error()) return store.Error(err) } @@ -235,21 +218,45 @@ func (ins *instanceStore) DeleteInstance(instanceID string) error { if instanceID == "" { return errors.New("delete Instance Missing instance id") } + return RetryTransaction("deleteInstance", func() error { + return ins.master.processWithTransaction("deleteInstance", func(tx *BaseTx) error { + str := "update instance set flag = 1, mtime = sysdate() where `id` = ?" + if _, err := tx.Exec(str, instanceID); err != nil { + return store.Error(err) + } - str := "update instance set flag = 1, mtime = sysdate() where `id` = ?" - _, err := ins.master.Exec(str, instanceID) - return store.Error(err) + if err := tx.Commit(); err != nil { + log.Errorf("[Store][database] delete instance commit tx err: %s", err.Error()) + return err + } + + return nil + }) + }) } // BatchDeleteInstances 批量删除实例 func (ins *instanceStore) BatchDeleteInstances(ids []interface{}) error { - return BatchOperation("delete-instance", ids, func(objects []interface{}) error { - if len(objects) == 0 { + return RetryTransaction("batchDeleteInstance", func() error { + return ins.master.processWithTransaction("batchDeleteInstance", func(tx *BaseTx) error { + if err := BatchOperation("delete-instance", ids, func(objects []interface{}) error { + if len(objects) == 0 { + return nil + } + str := `update instance set flag = 1, mtime = sysdate() where id in ( ` + PlaceholdersN(len(objects)) + `)` + _, err := tx.Exec(str, objects...) + return store.Error(err) + }); err != nil { + return err + } + + if err := tx.Commit(); err != nil { + log.Errorf("[Store][database] batch delete instance commit tx err: %s", err.Error()) + return err + } + return nil - } - str := `update instance set flag = 1, mtime = sysdate() where id in ( ` + PlaceholdersN(len(objects)) + `)` - _, err := ins.master.Exec(str, objects...) - return store.Error(err) + }) }) } @@ -508,42 +515,80 @@ func (ins *instanceStore) GetInstanceMeta(instanceID string) (map[string]string, // SetInstanceHealthStatus 设置实例健康状态 func (ins *instanceStore) SetInstanceHealthStatus(instanceID string, flag int, revision string) error { - str := "update instance set health_status = ?, revision = ?, mtime = sysdate() where `id` = ?" - _, err := ins.master.Exec(str, flag, revision, instanceID) - return store.Error(err) + return RetryTransaction("setInstanceHealthStatus", func() error { + return ins.master.processWithTransaction("setInstanceHealthStatus", func(tx *BaseTx) error { + str := "update instance set health_status = ?, revision = ?, mtime = sysdate() where `id` = ?" + if _, err := tx.Exec(str, flag, revision, instanceID); err != nil { + return store.Error(err) + } + + if err := tx.Commit(); err != nil { + log.Errorf("[Store][database] set instance health status commit tx err: %s", err.Error()) + return err + } + + return nil + }) + }) } // BatchSetInstanceHealthStatus 批量设置健康状态 func (ins *instanceStore) BatchSetInstanceHealthStatus(ids []interface{}, isolate int, revision string) error { - return BatchOperation("set-instance-healthy", ids, func(objects []interface{}) error { - if len(objects) == 0 { + return RetryTransaction("batchSetInstanceHealthStatus", func() error { + return ins.master.processWithTransaction("batchSetInstanceHealthStatus", func(tx *BaseTx) error { + if err := BatchOperation("set-instance-healthy", ids, func(objects []interface{}) error { + if len(objects) == 0 { + return nil + } + str := "update instance set health_status = ?, revision = ?, mtime = sysdate() where id in " + str += "(" + PlaceholdersN(len(objects)) + ")" + args := make([]interface{}, 0, len(objects)+2) + args = append(args, isolate) + args = append(args, revision) + args = append(args, objects...) + _, err := tx.Exec(str, args...) + return store.Error(err) + }); err != nil { + return err + } + + if err := tx.Commit(); err != nil { + log.Errorf("[Store][database] batch set instance health status commit tx err: %s", err.Error()) + return err + } + return nil - } - str := "update instance set health_status = ?, revision = ?, mtime = sysdate() where id in " - str += "(" + PlaceholdersN(len(objects)) + ")" - args := make([]interface{}, 0, len(objects)+2) - args = append(args, isolate) - args = append(args, revision) - args = append(args, objects...) - _, err := ins.master.Exec(str, args...) - return store.Error(err) + }) }) } // BatchSetInstanceIsolate 批量设置实例隔离状态 func (ins *instanceStore) BatchSetInstanceIsolate(ids []interface{}, isolate int, revision string) error { - return BatchOperation("set-instance-isolate", ids, func(objects []interface{}) error { - if len(objects) == 0 { + return RetryTransaction("batchSetInstanceIsolate", func() error { + return ins.master.processWithTransaction("batchSetInstanceIsolate", func(tx *BaseTx) error { + if err := BatchOperation("set-instance-isolate", ids, func(objects []interface{}) error { + if len(objects) == 0 { + return nil + } + str := "update instance set isolate = ?, revision = ?, mtime = sysdate() where id in " + str += "(" + PlaceholdersN(len(objects)) + ")" + args := make([]interface{}, 0, len(objects)+2) + args = append(args, isolate) + args = append(args, revision) + args = append(args, objects...) + _, err := tx.Exec(str, args...) + return store.Error(err) + }); err != nil { + return err + } + + if err := tx.Commit(); err != nil { + log.Errorf("[Store][database] batch set instance isolate commit tx err: %s", err.Error()) + return err + } + return nil - } - str := "update instance set isolate = ?, revision = ?, mtime = sysdate() where id in " - str += "(" + PlaceholdersN(len(objects)) + ")" - args := make([]interface{}, 0, len(objects)+2) - args = append(args, isolate) - args = append(args, revision) - args = append(args, objects...) - _, err := ins.master.Exec(str, args...) - return store.Error(err) + }) }) } diff --git a/store/mysql/l5.go b/store/mysql/l5.go index 94d31b743..86b57feb8 100644 --- a/store/mysql/l5.go +++ b/store/mysql/l5.go @@ -27,7 +27,8 @@ import ( // l5Store 实现了L5Store type l5Store struct { - db *BaseDB + master *BaseDB // 大部分操作都用主数据库 + slave *BaseDB // 缓存相关的读取,请求到slave } // GetL5Extend 获取L5扩展数据 @@ -55,7 +56,7 @@ func (l5 *l5Store) GenNextL5Sid(layoutID uint32) (string, error) { // genNextL5Sid func (l5 *l5Store) genNextL5Sid(layoutID uint32) (string, error) { - tx, err := l5.db.Begin() + tx, err := l5.master.Begin() if err != nil { log.Errorf("[Store][database] get next l5 sid tx begin err: %s", err.Error()) return "", err @@ -104,7 +105,7 @@ func (l5 *l5Store) GetMoreL5Extend(mtime time.Time) (map[string]map[string]inter // GetMoreL5Routes 获取更多的L5 Route信息 func (l5 *l5Store) GetMoreL5Routes(flow uint32) ([]*model.Route, error) { str := getL5RouteSelectSQL() + " where Fflow > ?" - rows, err := l5.db.Query(str, flow) + rows, err := l5.slave.Query(str, flow) if err != nil { log.Errorf("[Store][database] get more l5 route query err: %s", err.Error()) return nil, err @@ -116,7 +117,7 @@ func (l5 *l5Store) GetMoreL5Routes(flow uint32) ([]*model.Route, error) { // GetMoreL5Policies 获取更多的L5 Policy信息 func (l5 *l5Store) GetMoreL5Policies(flow uint32) ([]*model.Policy, error) { str := getL5PolicySelectSQL() + " where Fflow > ?" - rows, err := l5.db.Query(str, flow) + rows, err := l5.slave.Query(str, flow) if err != nil { log.Errorf("[Store][database] get more l5 policy query err: %s", err.Error()) return nil, err @@ -128,7 +129,7 @@ func (l5 *l5Store) GetMoreL5Policies(flow uint32) ([]*model.Policy, error) { // GetMoreL5Sections 获取更多的L5 Section信息 func (l5 *l5Store) GetMoreL5Sections(flow uint32) ([]*model.Section, error) { str := getL5SectionSelectSQL() + " where Fflow > ?" - rows, err := l5.db.Query(str, flow) + rows, err := l5.slave.Query(str, flow) if err != nil { log.Errorf("[Store][database] get more l5 section query err: %s", err.Error()) return nil, err @@ -140,7 +141,7 @@ func (l5 *l5Store) GetMoreL5Sections(flow uint32) ([]*model.Section, error) { // GetMoreL5IPConfigs 获取更多的L5 IPConfig信息 func (l5 *l5Store) GetMoreL5IPConfigs(flow uint32) ([]*model.IPConfig, error) { str := getL5IPConfigSelectSQL() + " where Fflow > ?" - rows, err := l5.db.Query(str, flow) + rows, err := l5.slave.Query(str, flow) if err != nil { log.Errorf("[Store][database] get more l5 ip config query err: %s", err.Error()) return nil, err diff --git a/store/mysql/maintain.go b/store/mysql/maintain.go index a9caf2b3c..0e2f3fc39 100644 --- a/store/mysql/maintain.go +++ b/store/mysql/maintain.go @@ -71,19 +71,27 @@ type leaderElectionStore struct { master *BaseDB } -// CreateLeaderElection +// CreateLeaderElection insert election key into leader table func (l *leaderElectionStore) CreateLeaderElection(key string) error { log.Debugf("[Store][database] create leader election (%s)", key) - mainStr := "insert ignore into leader_election (elect_key, leader) values (?, ?)" + return l.master.processWithTransaction("createLeaderElection", func(tx *BaseTx) error { + mainStr := "insert ignore into leader_election (elect_key, leader) values (?, ?)" - _, err := l.master.Exec(mainStr, key, "") - if err != nil { - log.Errorf("[Store][database] create leader election (%s), err: %s", key, err.Error()) - } - return err + _, err := tx.Exec(mainStr, key, "") + if err != nil { + log.Errorf("[Store][database] create leader election (%s), err: %s", key, err.Error()) + } + + if err := tx.Commit(); err != nil { + log.Errorf("[Store][database] create leader election (%s) commit tx err: %s", key, err.Error()) + return err + } + + return nil + }) } -// GetVersion +// GetVersion get the version from election func (l *leaderElectionStore) GetVersion(key string) (int64, error) { log.Debugf("[Store][database] get version (%s)", key) mainStr := "select version from leader_election where elect_key = ?" @@ -96,26 +104,36 @@ func (l *leaderElectionStore) GetVersion(key string) (int64, error) { return count, store.Error(err) } -// CompareAndSwapVersion +// CompareAndSwapVersion compare key version and update func (l *leaderElectionStore) CompareAndSwapVersion(key string, curVersion int64, newVersion int64, leader string) (bool, error) { + var rows int64 + err := l.master.processWithTransaction("compareAndSwapVersion", func(tx *BaseTx) error { + log.Debugf("[Store][database] compare and swap version (%s, %d, %d, %s)", key, curVersion, newVersion, leader) + mainStr := "update leader_election set leader = ?, version = ? where elect_key = ? and version = ?" + result, err := tx.Exec(mainStr, leader, newVersion, key, curVersion) + if err != nil { + log.Errorf("[Store][database] compare and swap version (%s), err: %s", key, err.Error()) + return store.Error(err) + } + tRows, err := result.RowsAffected() + if err != nil { + log.Errorf("[Store][database] compare and swap version (%s), get RowsAffected err: %s", key, err.Error()) + return store.Error(err) + } - log.Debugf("[Store][database] compare and swap version (%s, %d, %d, %s)", key, curVersion, newVersion, leader) - mainStr := "update leader_election set leader = ?, version = ? where elect_key = ? and version = ?" - result, err := l.master.DB.Exec(mainStr, leader, newVersion, key, curVersion) - if err != nil { - log.Errorf("[Store][database] compare and swap version (%s), err: %s", key, err.Error()) - return false, store.Error(err) - } - rows, err := result.RowsAffected() - if err != nil { - log.Errorf("[Store][database] compare and swap version (%s), get RowsAffected err: %s", key, err.Error()) - return false, store.Error(err) - } - return (rows > 0), nil + if err := tx.Commit(); err != nil { + log.Errorf("[Store][database] create leader election (%s) commit tx err: %s", key, err.Error()) + return err + } + + rows = tRows + return nil + }) + return rows > 0, err } -// CheckMtimeExpired +// CheckMtimeExpired check last modify time expired func (l *leaderElectionStore) CheckMtimeExpired(key string, leaseTime int32) (bool, error) { log.Debugf("[Store][database] check mtime expired (%s, %d)", key, leaseTime) mainStr := "select count(1) from leader_election where elect_key = ? and mtime < " + @@ -129,7 +147,7 @@ func (l *leaderElectionStore) CheckMtimeExpired(key string, leaseTime int32) (bo return (count > 0), store.Error(err) } -// ListLeaderElection +// ListLeaderElections list the election records func (l *leaderElectionStore) ListLeaderElections() ([]*model.LeaderElection, error) { log.Info("[Store][database] list leader election") mainStr := "select elect_key, leader, UNIX_TIMESTAMP(ctime), UNIX_TIMESTAMP(mtime) from leader_election" @@ -331,7 +349,7 @@ func (le *leaderElectionStateMachine) setReleaseTickLimit() { le.releaseTickLimit = LeaseTime / TickTime * 3 } -// StartLeaderElection +// StartLeaderElection start the election procedure func (m *maintainStore) StartLeaderElection(key string) error { m.mutex.Lock() defer m.mutex.Unlock() @@ -361,7 +379,7 @@ func (m *maintainStore) StartLeaderElection(key string) error { return nil } -// StopLeaderElections +// StopLeaderElections stop the election procedure func (m *maintainStore) StopLeaderElections() { m.mutex.Lock() defer m.mutex.Unlock() @@ -371,27 +389,27 @@ func (m *maintainStore) StopLeaderElections() { } } -// IsLeader -func (maintain *maintainStore) IsLeader(key string) bool { - maintain.mutex.Lock() - defer maintain.mutex.Unlock() - le, ok := maintain.leMap[key] +// IsLeader check leader +func (m *maintainStore) IsLeader(key string) bool { + m.mutex.Lock() + defer m.mutex.Unlock() + le, ok := m.leMap[key] if !ok { return false } return le.isLeaderAtomic() } -// ListLeaderElections -func (maintain *maintainStore) ListLeaderElections() ([]*model.LeaderElection, error) { - return maintain.leStore.ListLeaderElections() +// ListLeaderElections list election records +func (m *maintainStore) ListLeaderElections() ([]*model.LeaderElection, error) { + return m.leStore.ListLeaderElections() } -// ReleaseLeaderElection -func (maintain *maintainStore) ReleaseLeaderElection(key string) error { - maintain.mutex.Lock() - defer maintain.mutex.Unlock() - le, ok := maintain.leMap[key] +// ReleaseLeaderElection release election lock +func (m *maintainStore) ReleaseLeaderElection(key string) error { + m.mutex.Lock() + defer m.mutex.Unlock() + le, ok := m.leMap[key] if !ok { return fmt.Errorf("LeaderElection(%s) not started", key) } @@ -401,21 +419,32 @@ func (maintain *maintainStore) ReleaseLeaderElection(key string) error { } // BatchCleanDeletedInstances batch clean soft deleted instances -func (maintain *maintainStore) BatchCleanDeletedInstances(batchSize uint32) (uint32, error) { +func (m *maintainStore) BatchCleanDeletedInstances(batchSize uint32) (uint32, error) { log.Infof("[Store][database] batch clean soft deleted instances(%d)", batchSize) - mainStr := "delete from instance where flag = 1 limit ?" - result, err := maintain.master.Exec(mainStr, batchSize) - if err != nil { - log.Errorf("[Store][database] batch clean soft deleted instances(%d), err: %s", batchSize, err.Error()) - return 0, store.Error(err) - } + var rows int64 + err := m.master.processWithTransaction("batchCleanDeletedInstances", func(tx *BaseTx) error { + mainStr := "delete from instance where flag = 1 limit ?" + result, err := tx.Exec(mainStr, batchSize) + if err != nil { + log.Errorf("[Store][database] batch clean soft deleted instances(%d), err: %s", batchSize, err.Error()) + return store.Error(err) + } - rows, err := result.RowsAffected() - if err != nil { - log.Warnf("[Store][database] batch clean soft deleted instances(%d), get RowsAffected err: %s", - batchSize, err.Error()) - return 0, store.Error(err) - } + tRows, err := result.RowsAffected() + if err != nil { + log.Warnf("[Store][database] batch clean soft deleted instances(%d), get RowsAffected err: %s", + batchSize, err.Error()) + return store.Error(err) + } + + if err := tx.Commit(); err != nil { + log.Errorf("[Store][database] batch clean soft deleted instances(%d) commit tx err: %s", + batchSize, err.Error()) + return err + } - return uint32(rows), nil + rows = tRows + return nil + }) + return uint32(rows), err } diff --git a/store/mysql/namespace.go b/store/mysql/namespace.go index d6ed8a224..2ad72592e 100644 --- a/store/mysql/namespace.go +++ b/store/mysql/namespace.go @@ -29,8 +29,8 @@ import ( // namespaceStore 实现了NamespaceStore type namespaceStore struct { - db *BaseDB - slave *BaseDB + master *BaseDB // 大部分操作都用主数据库 + slave *BaseDB // 缓存相关的读取,请求到slave } // AddNamespace 添加命名空间 @@ -38,15 +38,26 @@ func (ns *namespaceStore) AddNamespace(namespace *model.Namespace) error { if namespace.Name == "" { return errors.New("store add namespace name is empty") } - - // 先删除无效数据,再添加新数据 - if err := ns.cleanNamespace(namespace.Name); err != nil { - return err - } - - str := "insert into namespace(name, comment, token, owner, ctime, mtime) values(?,?,?,?,sysdate(),sysdate())" - _, err := ns.db.Exec(str, namespace.Name, namespace.Comment, namespace.Token, namespace.Owner) - return store.Error(err) + return RetryTransaction("addNamespace", func() error { + return ns.master.processWithTransaction("addNamespace", func(tx *BaseTx) error { + // 先删除无效数据,再添加新数据 + if err := cleanNamespace(tx, namespace.Name); err != nil { + return err + } + + str := "insert into namespace(name, comment, token, owner, ctime, mtime) values(?,?,?,?,sysdate(),sysdate())" + if _, err := tx.Exec(str, namespace.Name, namespace.Comment, namespace.Token, namespace.Owner); err != nil { + return store.Error(err) + } + + if err := tx.Commit(); err != nil { + log.Errorf("[Store][database] batch delete instance commit tx err: %s", err.Error()) + return err + } + + return nil + }) + }) } // UpdateNamespace 更新命名空间,目前只更新owner @@ -54,10 +65,21 @@ func (ns *namespaceStore) UpdateNamespace(namespace *model.Namespace) error { if namespace.Name == "" { return errors.New("store update namespace name is empty") } - - str := "update namespace set owner = ?, comment = ?,mtime = sysdate() where name = ?" - _, err := ns.db.Exec(str, namespace.Owner, namespace.Comment, namespace.Name) - return store.Error(err) + return RetryTransaction("updateNamespace", func() error { + return ns.master.processWithTransaction("updateNamespace", func(tx *BaseTx) error { + str := "update namespace set owner = ?, comment = ?,mtime = sysdate() where name = ?" + if _, err := tx.Exec(str, namespace.Owner, namespace.Comment, namespace.Name); err != nil { + return store.Error(err) + } + + if err := tx.Commit(); err != nil { + log.Errorf("[Store][database] batch delete instance commit tx err: %s", err.Error()) + return err + } + + return nil + }) + }) } // UpdateNamespaceToken 更新命名空间token @@ -66,10 +88,21 @@ func (ns *namespaceStore) UpdateNamespaceToken(name string, token string) error return fmt.Errorf( "store update namespace token some param are empty, name is %s, token is %s", name, token) } - - str := "update namespace set token = ?, mtime = sysdate() where name = ?" - _, err := ns.db.Exec(str, token, name) - return store.Error(err) + return RetryTransaction("updateNamespaceToken", func() error { + return ns.master.processWithTransaction("updateNamespaceToken", func(tx *BaseTx) error { + str := "update namespace set token = ?, mtime = sysdate() where name = ?" + if _, err := tx.Exec(str, token, name); err != nil { + return store.Error(err) + } + + if err := tx.Commit(); err != nil { + log.Errorf("[Store][database] batch delete instance commit tx err: %s", err.Error()) + return err + } + + return nil + }) + }) } // ListNamespaces 展示owner下所有的命名空间 TODO @@ -79,7 +112,7 @@ func (ns *namespaceStore) ListNamespaces(owner string) ([]*model.Namespace, erro } str := genNamespaceSelectSQL() + " where owner like '%?%'" - rows, err := ns.db.Query(str, owner) + rows, err := ns.master.Query(str, owner) if err != nil { return nil, err } @@ -138,7 +171,7 @@ func (ns *namespaceStore) getNamespacesCount(filter map[string][]string) (uint32 str, args := genNamespaceWhereSQLAndArgs(str, filter, nil, 0, 1) var count uint32 - err := ns.db.QueryRow(str, args...).Scan(&count) + err := ns.master.QueryRow(str, args...).Scan(&count) switch { case err == sql.ErrNoRows: log.Errorf("[Store][database] no row with this namespace filter") @@ -157,7 +190,7 @@ func (ns *namespaceStore) getNamespaces(filter map[string][]string, offset, limi order := &Order{"mtime", "desc"} str, args := genNamespaceWhereSQLAndArgs(str, filter, order, offset, limit) - rows, err := ns.db.Query(str, args...) + rows, err := ns.master.Query(str, args...) if err != nil { log.Errorf("[Store][database] get namespaces by filter query err: %s", err.Error()) return nil, err @@ -173,7 +206,7 @@ func (ns *namespaceStore) getNamespace(name string) (*model.Namespace, error) { } str := genNamespaceSelectSQL() + " where name = ?" - rows, err := ns.db.Query(str, name) + rows, err := ns.master.Query(str, name) if err != nil { log.Errorf("[Store][database] get namespace query err: %s", err.Error()) return nil, err @@ -191,11 +224,11 @@ func (ns *namespaceStore) getNamespace(name string) (*model.Namespace, error) { } // clean真实的数据,只有flag=1的数据才可以清除 -func (ns *namespaceStore) cleanNamespace(name string) error { +func cleanNamespace(tx *BaseTx, name string) error { str := "delete from namespace where name = ? and flag = 1" // 必须打印日志说明 log.Infof("[Store][database] clean namespace(%s)", name) - if _, err := ns.db.Exec(str, name); err != nil { + if _, err := tx.Exec(str, name); err != nil { log.Infof("[Store][database] clean namespace(%s) err: %s", name, err.Error()) return err } diff --git a/store/mysql/ratelimit_config.go b/store/mysql/ratelimit_config.go index bf71c16e1..cf3b090d8 100644 --- a/store/mysql/ratelimit_config.go +++ b/store/mysql/ratelimit_config.go @@ -33,8 +33,8 @@ var _ store.RateLimitStore = (*rateLimitStore)(nil) // rateLimitStore RateLimitStore的实现 type rateLimitStore struct { - db *BaseDB - slave *BaseDB + master *BaseDB + slave *BaseDB } // CreateRateLimit 新建限流规则 @@ -63,7 +63,7 @@ func limitToEtimeStr(limit *model.RateLimit) string { // createRateLimit func (rls *rateLimitStore) createRateLimit(limit *model.RateLimit) error { - tx, err := rls.db.Begin() + tx, err := rls.master.Begin() if err != nil { log.Errorf("[Store][database] create rate limit(%+v) begin tx err: %s", limit, err.Error()) return err @@ -129,7 +129,7 @@ func (rls *rateLimitStore) EnableRateLimit(limit *model.RateLimit) error { // enableRateLimit func (rls *rateLimitStore) enableRateLimit(limit *model.RateLimit) error { - tx, err := rls.db.Begin() + tx, err := rls.master.Begin() if err != nil { log.Errorf("[Store][database] update rate limit(%+v) begin tx err: %s", limit, err.Error()) return err @@ -147,7 +147,7 @@ func (rls *rateLimitStore) enableRateLimit(limit *model.RateLimit) error { return err } - if err := rls.updateLastRevision(tx, limit.ServiceID, limit.Revision); err != nil { + if err := updateLastRevision(tx, limit.ServiceID, limit.Revision); err != nil { log.Errorf("[Store][database][Update] update rate limit revision with service id(%s) err: %s", limit.ServiceID, err.Error()) return err @@ -162,7 +162,7 @@ func (rls *rateLimitStore) enableRateLimit(limit *model.RateLimit) error { // updateRateLimit func (rls *rateLimitStore) updateRateLimit(limit *model.RateLimit) error { - tx, err := rls.db.Begin() + tx, err := rls.master.Begin() if err != nil { log.Errorf("[Store][database] update rate limit(%+v) begin tx err: %s", limit, err.Error()) return err @@ -181,7 +181,7 @@ func (rls *rateLimitStore) updateRateLimit(limit *model.RateLimit) error { return err } - if err := rls.updateLastRevision(tx, limit.ServiceID, limit.Revision); err != nil { + if err := updateLastRevision(tx, limit.ServiceID, limit.Revision); err != nil { log.Errorf("[Store][database][Update] update rate limit revision with service id(%s) err: %s", limit.ServiceID, err.Error()) return err @@ -209,7 +209,7 @@ func (rls *rateLimitStore) DeleteRateLimit(limit *model.RateLimit) error { // deleteRateLimit func (rls *rateLimitStore) deleteRateLimit(limit *model.RateLimit) error { - tx, err := rls.db.Begin() + tx, err := rls.master.Begin() if err != nil { log.Errorf("[Store][database] delete rate limit(%+v) begin tx err: %s", limit, err.Error()) return err @@ -225,7 +225,7 @@ func (rls *rateLimitStore) deleteRateLimit(limit *model.RateLimit) error { return err } - if err := rls.updateLastRevision(tx, limit.ServiceID, limit.Revision); err != nil { + if err := updateLastRevision(tx, limit.ServiceID, limit.Revision); err != nil { log.Errorf("[Store][database][Delete] update rate limit revision with service id(%s) err: %s", limit.ServiceID, err.Error()) return err @@ -248,7 +248,7 @@ func (rls *rateLimitStore) GetRateLimitWithID(id string) (*model.RateLimit, erro str := `select id, name, disable, service_id, method, labels, priority, rule, revision, flag, unix_timestamp(ctime), unix_timestamp(mtime), unix_timestamp(etime) from ratelimit_config where id = ? and flag = 0` - rows, err := rls.db.Query(str, id) + rows, err := rls.master.Query(str, id) if err != nil { log.Errorf("[Store][database] query rate limit with id(%s) err: %s", id, err.Error()) return nil, err @@ -390,7 +390,7 @@ func (rls *rateLimitStore) getBriefRateLimits( args = append(args, offset, limit) str = str + queryStr + ` order by ratelimit_config.mtime desc limit ?, ?` - rows, err := rls.db.Query(str, args...) + rows, err := rls.master.Query(str, args...) if err != nil { log.Errorf("[Store][database] query rate limits err: %s", err.Error()) return nil, err @@ -449,7 +449,7 @@ func (rls *rateLimitStore) getExpandRateLimits( args = append(args, offset, limit) str = str + queryStr + ` order by ratelimit_config.mtime desc limit ?, ?` - rows, err := rls.db.Query(str, args...) + rows, err := rls.master.Query(str, args...) if err != nil { log.Errorf("[Store][database] query rate limits err: %s", err.Error()) return nil, err @@ -505,7 +505,7 @@ func (rls *rateLimitStore) getExpandRateLimitsCount(filter map[string]string) (u queryStr, args := genFilterRateLimitSQL(filter) str = str + queryStr var total uint32 - err := rls.db.QueryRow(str, args...).Scan(&total) + err := rls.master.QueryRow(str, args...).Scan(&total) switch { case err == sql.ErrNoRows: return 0, nil @@ -552,18 +552,8 @@ func genFilterRateLimitSQL(query map[string]string) (string, []interface{}) { return str, args } -// cleanRateLimit 从数据库清除限流规则数据 -func (rls *rateLimitStore) cleanRateLimit(id string) error { - str := `delete from ratelimit_config where id = ? and flag = 1` - if _, err := rls.db.Exec(str, id); err != nil { - log.Errorf("[Store][database] clean rate limit id(%s) err: %s", id, err.Error()) - return err - } - return nil -} - // updateLastRevision 更新last_revision -func (rls *rateLimitStore) updateLastRevision(tx *BaseTx, serviceID string, revision string) error { +func updateLastRevision(tx *BaseTx, serviceID string, revision string) error { str := `update ratelimit_revision set last_revision = ?, mtime = sysdate() where service_id = ?` if _, err := tx.Exec(str, revision, serviceID); err != nil { return err diff --git a/store/mysql/routing_config.go b/store/mysql/routing_config.go index 473c33afe..2e217e585 100644 --- a/store/mysql/routing_config.go +++ b/store/mysql/routing_config.go @@ -44,20 +44,29 @@ func (rs *routingConfigStore) CreateRoutingConfig(conf *model.RoutingConfig) err return store.NewStatusError(store.EmptyParamsErr, "missing some params") } - // 新建之前,先清理老数据 - if err := rs.cleanRoutingConfig(conf.ID); err != nil { - return store.Error(err) - } - - // 服务配置的创建由外层进行服务的保护,这里不需要加锁 - str := `insert into routing_config(id, in_bounds, out_bounds, revision, ctime, mtime) + return RetryTransaction("createRoutingConfig", func() error { + return rs.master.processWithTransaction("createRoutingConfig", func(tx *BaseTx) error { + // 新建之前,先清理老数据 + if err := cleanRoutingConfig(tx, conf.ID); err != nil { + return store.Error(err) + } + + // 服务配置的创建由外层进行服务的保护,这里不需要加锁 + str := `insert into routing_config(id, in_bounds, out_bounds, revision, ctime, mtime) values(?,?,?,?,sysdate(),sysdate())` - if _, err := rs.master.Exec(str, conf.ID, conf.InBounds, conf.OutBounds, conf.Revision); err != nil { - log.Errorf("[Store][database] create routing(%+v) err: %s", conf, err.Error()) - return store.Error(err) - } - - return nil + if _, err := tx.Exec(str, conf.ID, conf.InBounds, conf.OutBounds, conf.Revision); err != nil { + log.Errorf("[Store][database] create routing(%+v) err: %s", conf, err.Error()) + return store.Error(err) + } + + if err := tx.Commit(); err != nil { + log.Errorf("[Store][database] fail to create routing commit tx, rule(%+v) commit tx err: %s", + conf, err.Error()) + return err + } + return nil + }) + }) } // UpdateRoutingConfig 更新 @@ -70,14 +79,22 @@ func (rs *routingConfigStore) UpdateRoutingConfig(conf *model.RoutingConfig) err log.Errorf("[Store][database] update routing config missing params") return store.NewStatusError(store.EmptyParamsErr, "missing some params") } - - str := `update routing_config set in_bounds = ?, out_bounds = ?, revision = ?, mtime = sysdate() where id = ?` - if _, err := rs.master.Exec(str, conf.InBounds, conf.OutBounds, conf.Revision, conf.ID); err != nil { - log.Errorf("[Store][database] update routing config(%+v) exec err: %s", conf, err.Error()) - return store.Error(err) - } - - return nil + return RetryTransaction("updateRoutingConfig", func() error { + return rs.master.processWithTransaction("updateRoutingConfig", func(tx *BaseTx) error { + str := `update routing_config set in_bounds = ?, out_bounds = ?, revision = ?, mtime = sysdate() where id = ?` + if _, err := tx.Exec(str, conf.InBounds, conf.OutBounds, conf.Revision, conf.ID); err != nil { + log.Errorf("[Store][database] update routing config(%+v) exec err: %s", conf, err.Error()) + return store.Error(err) + } + + if err := tx.Commit(); err != nil { + log.Errorf("[Store][database] fail to update routing commit tx, rule(%+v) commit tx err: %s", + conf, err.Error()) + return err + } + return nil + }) + }) } // DeleteRoutingConfig 删除 @@ -86,14 +103,22 @@ func (rs *routingConfigStore) DeleteRoutingConfig(serviceID string) error { log.Errorf("[Store][database] delete routing config missing service id") return store.NewStatusError(store.EmptyParamsErr, "missing service id") } - - str := `update routing_config set flag = 1, mtime = sysdate() where id = ?` - if _, err := rs.master.Exec(str, serviceID); err != nil { - log.Errorf("[Store][database] delete routing config(%s) err: %s", serviceID, err.Error()) - return store.Error(err) - } - - return nil + return RetryTransaction("deleteRoutingConfig", func() error { + return rs.master.processWithTransaction("deleteRoutingConfig", func(tx *BaseTx) error { + str := `update routing_config set flag = 1, mtime = sysdate() where id = ?` + if _, err := tx.Exec(str, serviceID); err != nil { + log.Errorf("[Store][database] delete routing config(%s) err: %s", serviceID, err.Error()) + return store.Error(err) + } + + if err := tx.Commit(); err != nil { + log.Errorf("[Store][database] fail to delete routing commit tx, rule(%s) commit tx err: %s", + serviceID, err.Error()) + return err + } + return nil + }) + }) } // DeleteRoutingConfigTx 删除 @@ -242,9 +267,9 @@ func (rs *routingConfigStore) GetRoutingConfigs(filter map[string]string, } // cleanRoutingConfig 从数据库彻底清理路由配置 -func (rs *routingConfigStore) cleanRoutingConfig(serviceID string) error { +func cleanRoutingConfig(tx *BaseTx, serviceID string) error { str := `delete from routing_config where id = ? and flag = 1` - if _, err := rs.master.Exec(str, serviceID); err != nil { + if _, err := tx.Exec(str, serviceID); err != nil { log.Errorf("[Store][database] clean routing config(%s) err: %s", serviceID, err.Error()) return err } diff --git a/store/mysql/service.go b/store/mysql/service.go index 77685f2fb..6d5aa845a 100644 --- a/store/mysql/service.go +++ b/store/mysql/service.go @@ -67,7 +67,7 @@ func (ss *serviceStore) addService(s *model.Service) error { } // 先清理无效数据 - if err := ss.cleanService(s.Name, s.Namespace); err != nil { + if err := cleanService(tx, s.Name, s.Namespace); err != nil { return err } @@ -159,13 +159,19 @@ func deleteServiceByID(tx *BaseTx, id string) error { // DeleteServiceAlias 删除服务别名 func (ss *serviceStore) DeleteServiceAlias(name string, namespace string) error { - str := "update service set flag = 1, mtime = sysdate() where name = ? and namespace = ?" - if _, err := ss.master.Exec(str, name, namespace); err != nil { - log.Errorf("[Store][database] delete service alias err: %s", err.Error()) - return store.Error(err) - } + return ss.master.processWithTransaction("deleteServiceAlias", func(tx *BaseTx) error { + str := "update service set flag = 1, mtime = sysdate() where name = ? and namespace = ?" + if _, err := tx.Exec(str, name, namespace); err != nil { + log.Errorf("[Store][database] delete service alias err: %s", err.Error()) + return store.Error(err) + } - return nil + if err := tx.Commit(); err != nil { + log.Errorf("[Store][database] batch delete service alias commit tx err: %s", err.Error()) + return err + } + return nil + }) } // UpdateServiceAlias 更新服务别名 @@ -311,14 +317,21 @@ func (ss *serviceStore) updateService(service *model.Service, needUpdateOwner bo // UpdateServiceToken 更新服务token func (ss *serviceStore) UpdateServiceToken(id string, token string, revision string) error { - str := `update service set token = ?, revision = ?, mtime = sysdate() where id = ?` - _, err := ss.master.Exec(str, token, revision, id) - if err != nil { - log.Errorf("[Store][database] update service(%s) token err: %s", id, err.Error()) - return store.Error(err) - } + return ss.master.processWithTransaction("updateServiceToken", func(tx *BaseTx) error { + str := `update service set token = ?, revision = ?, mtime = sysdate() where id = ?` + _, err := tx.Exec(str, token, revision, id) + if err != nil { + log.Errorf("[Store][database] update service(%s) token err: %s", id, err.Error()) + return store.Error(err) + } - return nil + if err := tx.Commit(); err != nil { + log.Errorf("[Store][database] update service token tx commit err: %s", err.Error()) + return err + } + + return nil + }) } // GetService 获取服务详情,只返回有效的数据 @@ -718,10 +731,10 @@ func (ss *serviceStore) getServiceByID(serviceID string) (*model.Service, error) } // cleanService 清理无效数据,flag=1的数据,只需要删除service即可 -func (ss *serviceStore) cleanService(name string, namespace string) error { +func cleanService(tx *BaseTx, name string, namespace string) error { log.Infof("[Store][database] clean service(%s, %s)", name, namespace) str := "delete from service where name = ? and namespace = ? and flag = 1" - _, err := ss.master.Exec(str, name, namespace) + _, err := tx.Exec(str, name, namespace) if err != nil { log.Errorf("[Store][database] clean service(%s, %s) err: %s", name, namespace, err.Error()) return err From 513495220c7bfdaeff58dcecd9180acdf058cb5b Mon Sep 17 00:00:00 2001 From: andrewshan Date: Sun, 26 Feb 2023 21:31:28 +0800 Subject: [PATCH 5/8] fix: group not in tx --- store/mysql/group.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/store/mysql/group.go b/store/mysql/group.go index 9daed9092..fb1811ba0 100644 --- a/store/mysql/group.go +++ b/store/mysql/group.go @@ -79,7 +79,7 @@ func (u *groupStore) addGroup(group *model.UserGroupDetail) error { defer func() { _ = tx.Rollback() }() // 先清理无效数据 - if err := u.cleanInValidGroup(group.Name, group.Owner); err != nil { + if err := cleanInValidGroup(tx, group.Name, group.Owner); err != nil { return store.Error(err) } @@ -589,11 +589,11 @@ func fetchRown2UserGroup(rows *sql.Rows) (*model.UserGroup, error) { } // cleanInValidUserGroup 清理无效的用户组数据 -func (u *groupStore) cleanInValidGroup(name, owner string) error { +func cleanInValidGroup(tx *BaseTx, name, owner string) error { log.Infof("[Store][User] clean usergroup(%s)", name) str := "delete from user_group where name = ? and flag = 1" - if _, err := u.master.Exec(str, name); err != nil { + if _, err := tx.Exec(str, name); err != nil { log.Errorf("[Store][User] clean usergroup(%s) err: %s", name, err.Error()) return err } From d5215a36c077a7fd95c1a753463399032820e14c Mon Sep 17 00:00:00 2001 From: andrewshan Date: Mon, 27 Feb 2023 10:26:47 +0800 Subject: [PATCH 6/8] =?UTF-8?q?feat=EF=BC=9A=E5=A2=9E=E5=8A=A0=E4=B8=B2?= =?UTF-8?q?=E8=A1=8C=E5=88=9B=E5=BB=BA=E5=AE=9E=E4=BE=8B=E7=9A=84=E8=A6=86?= =?UTF-8?q?=E7=9B=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- service/instance_test.go | 5 +++++ store/api.go | 3 --- store/boltdb/namespace.go | 19 ------------------- store/mysql/namespace.go | 15 --------------- 4 files changed, 5 insertions(+), 37 deletions(-) diff --git a/service/instance_test.go b/service/instance_test.go index a26a523c2..8a2f6d55a 100644 --- a/service/instance_test.go +++ b/service/instance_test.go @@ -61,6 +61,11 @@ func TestCreateInstance(t *testing.T) { defer discoverSuit.cleanServiceName(serviceResp.GetName().GetValue(), serviceResp.GetNamespace().GetValue()) t.Run("正常创建实例-服务没有提前创建", func(t *testing.T) { + bc := namingServer.bc + namingServer.bc = nil + defer func() { + namingServer.bc = bc + }() instanceReq, instanceResp := discoverSuit.createCommonInstance(t, &apiservice.Service{ Name: utils.NewStringValue("test-nocreate-service"), Namespace: utils.NewStringValue(DefaultNamespace), diff --git a/store/api.go b/store/api.go index 6e20c6e29..53b5bb919 100644 --- a/store/api.go +++ b/store/api.go @@ -67,9 +67,6 @@ type NamespaceStore interface { // UpdateNamespaceToken Update namespace token UpdateNamespaceToken(name string, token string) error - // ListNamespaces Query all namespaces under Owner - ListNamespaces(owner string) ([]*model.Namespace, error) - // GetNamespace Get the details of the namespace according to Name GetNamespace(name string) (*model.Namespace, error) diff --git a/store/boltdb/namespace.go b/store/boltdb/namespace.go index 010b21221..de7048ceb 100644 --- a/store/boltdb/namespace.go +++ b/store/boltdb/namespace.go @@ -131,25 +131,6 @@ func (n *namespaceStore) UpdateNamespaceToken(name string, token string) error { return n.handler.UpdateValue(tblNameNamespace, name, properties) } -// ListNamespaces query all namespaces by owner -func (n *namespaceStore) ListNamespaces(owner string) ([]*model.Namespace, error) { - if owner == "" { - return nil, errors.New("store lst namespaces owner is empty") - } - values, err := n.handler.LoadValuesByFilter( - tblNameNamespace, []string{"Owner"}, &model.Namespace{}, func(value map[string]interface{}) bool { - ownerValue, ok := value["Owner"] - if !ok { - return false - } - return strings.Contains(ownerValue.(string), owner) - }) - if err != nil { - return nil, err - } - return toNamespaces(values), nil -} - // GetNamespace query namespace by name func (n *namespaceStore) GetNamespace(name string) (*model.Namespace, error) { values, err := n.handler.LoadValues(tblNameNamespace, []string{name}, &model.Namespace{}) diff --git a/store/mysql/namespace.go b/store/mysql/namespace.go index 2ad72592e..ebc3b0edb 100644 --- a/store/mysql/namespace.go +++ b/store/mysql/namespace.go @@ -105,21 +105,6 @@ func (ns *namespaceStore) UpdateNamespaceToken(name string, token string) error }) } -// ListNamespaces 展示owner下所有的命名空间 TODO -func (ns *namespaceStore) ListNamespaces(owner string) ([]*model.Namespace, error) { - if owner == "" { - return nil, errors.New("store list namespaces owner is empty") - } - - str := genNamespaceSelectSQL() + " where owner like '%?%'" - rows, err := ns.master.Query(str, owner) - if err != nil { - return nil, err - } - - return namespaceFetchRows(rows) -} - // GetNamespace 根据名字获取命名空间详情,只返回有效的 func (ns *namespaceStore) GetNamespace(name string) (*model.Namespace, error) { namespace, err := ns.getNamespace(name) From de0264b81bf5f3bbaa5c402f12b372e0400e7d81 Mon Sep 17 00:00:00 2001 From: andrewshan Date: Mon, 27 Feb 2023 11:13:54 +0800 Subject: [PATCH 7/8] =?UTF-8?q?feat=EF=BC=9Aremove=20ListNamespaces=20refe?= =?UTF-8?q?r?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- store/boltdb/namespace_test.go | 25 ------------------------- store/mock/api_mock.go | 30 ------------------------------ 2 files changed, 55 deletions(-) diff --git a/store/boltdb/namespace_test.go b/store/boltdb/namespace_test.go index da52b5c6b..92b3ae560 100644 --- a/store/boltdb/namespace_test.go +++ b/store/boltdb/namespace_test.go @@ -78,31 +78,6 @@ func TestNamespaceStore_AddNamespace(t *testing.T) { } } -func TestNamespaceStore_ListNamespaces(t *testing.T) { - _ = os.RemoveAll("./table.bolt") - handler, err := NewBoltHandler(&BoltConfig{FileName: "./table.bolt"}) - if err != nil { - t.Fatal(err) - } - defer handler.Close() - nsStore := &namespaceStore{handler: handler} - - if err := InitNamespaceData(nsStore, nsCount); err != nil { - t.Fatal(err) - } - - namespaces, err := nsStore.ListNamespaces(nsOwner) - if err != nil { - t.Fatal(err) - } - for _, namespace := range namespaces { - fmt.Printf("namespace is %+v\n", namespace) - } - if len(namespaces) != nsCount { - t.Fatal(fmt.Sprintf("namespaces count not match, expect %d, got %d", nsCount, len(namespaces))) - } -} - func TestNamespaceStore_GetNamespaces(t *testing.T) { _ = os.RemoveAll("./table.bolt") handler, err := NewBoltHandler(&BoltConfig{FileName: "./table.bolt"}) diff --git a/store/mock/api_mock.go b/store/mock/api_mock.go index 3cc7d3d8d..5ebd77494 100644 --- a/store/mock/api_mock.go +++ b/store/mock/api_mock.go @@ -2029,21 +2029,6 @@ func (mr *MockStoreMockRecorder) ListMasterCircuitBreakers(filters, offset, limi return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListMasterCircuitBreakers", reflect.TypeOf((*MockStore)(nil).ListMasterCircuitBreakers), filters, offset, limit) } -// ListNamespaces mocks base method. -func (m *MockStore) ListNamespaces(owner string) ([]*model.Namespace, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ListNamespaces", owner) - ret0, _ := ret[0].([]*model.Namespace) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// ListNamespaces indicates an expected call of ListNamespaces. -func (mr *MockStoreMockRecorder) ListNamespaces(owner interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListNamespaces", reflect.TypeOf((*MockStore)(nil).ListNamespaces), owner) -} - // ListReleaseCircuitBreakers mocks base method. func (m *MockStore) ListReleaseCircuitBreakers(filters map[string]string, offset, limit uint32) (*model.CircuitBreakerDetail, error) { m.ctrl.T.Helper() @@ -2681,21 +2666,6 @@ func (mr *MockNamespaceStoreMockRecorder) GetNamespaces(filter, offset, limit in return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetNamespaces", reflect.TypeOf((*MockNamespaceStore)(nil).GetNamespaces), filter, offset, limit) } -// ListNamespaces mocks base method. -func (m *MockNamespaceStore) ListNamespaces(owner string) ([]*model.Namespace, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ListNamespaces", owner) - ret0, _ := ret[0].([]*model.Namespace) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// ListNamespaces indicates an expected call of ListNamespaces. -func (mr *MockNamespaceStoreMockRecorder) ListNamespaces(owner interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListNamespaces", reflect.TypeOf((*MockNamespaceStore)(nil).ListNamespaces), owner) -} - // UpdateNamespace mocks base method. func (m *MockNamespaceStore) UpdateNamespace(namespace *model.Namespace) error { m.ctrl.T.Helper() From f4b6e8ac968d4ba14be6e39079f9ffdad82d01c1 Mon Sep 17 00:00:00 2001 From: andrewshan Date: Mon, 27 Feb 2023 13:26:13 +0800 Subject: [PATCH 8/8] =?UTF-8?q?fix:=20=E8=A7=A3=E5=86=B3=E4=B8=8D=E5=9C=A8?= =?UTF-8?q?=E5=90=8C=E4=B8=80=E4=B8=AA=E4=BA=8B=E5=8A=A1=E7=9A=84=E9=97=AE?= =?UTF-8?q?=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- store/mysql/client.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/store/mysql/client.go b/store/mysql/client.go index 6d74bb375..6a27433ab 100644 --- a/store/mysql/client.go +++ b/store/mysql/client.go @@ -66,13 +66,13 @@ func (cs *clientStore) UpdateClient(client *model.Client) error { } // deleteClient delete the client info -func (cs *clientStore) deleteClient(clientID string) error { +func deleteClient(tx *BaseTx, clientID string) error { if clientID == "" { return errors.New("delete client missing client id") } str := "update client set flag = 1, mtime = sysdate() where `id` = ?" - _, err := cs.master.Exec(str, clientID) + _, err := tx.Exec(str, clientID) return store.Error(err) } @@ -302,7 +302,7 @@ func (cs *clientStore) createClient(client *model.Client) error { } defer func() { _ = tx.Rollback() }() // clean the old items before add - if err := cs.deleteClient(client.Proto().GetId().GetValue()); err != nil { + if err := deleteClient(tx, client.Proto().GetId().GetValue()); err != nil { return err } if err := addClientMain(tx, client); err != nil {