diff --git a/pkg/balancer/balancer.go b/pkg/balancer/balancer.go new file mode 100644 index 00000000000..e3105f4c0b8 --- /dev/null +++ b/pkg/balancer/balancer.go @@ -0,0 +1,61 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package balancer + +// Policy is the policy of balancer. +type Policy int + +const ( + // PolicyRoundRobin is the round robin policy. + PolicyRoundRobin Policy = iota + // PolicyLeast is the policy to return the least used node. + // TODO: move indexed heap to pkg and use it. + PolicyLeast +) + +func (p Policy) String() string { + switch p { + case PolicyRoundRobin: + return "round-robin" + case PolicyLeast: + return "least" + default: + return "unknown" + } +} + +// Balancer is the interface for balancer. +type Balancer[T uint32 | string] interface { + // Next returns next one. + Next() T + // Put puts one into balancer. + Put(T) + // Delete deletes one from balancer. + Delete(T) + // GetAll returns all nodes. + GetAll() []T + // Len returns the length of nodes. + Len() int +} + +// GenByPolicy generates a balancer by policy. +func GenByPolicy[T uint32 | string](policy Policy) Balancer[T] { + switch policy { + case PolicyRoundRobin: + return NewRoundRobin[T]() + default: // only round-robin is supported now. + return NewRoundRobin[T]() + } +} diff --git a/pkg/balancer/balancer_test.go b/pkg/balancer/balancer_test.go new file mode 100644 index 00000000000..f95487a4cc7 --- /dev/null +++ b/pkg/balancer/balancer_test.go @@ -0,0 +1,102 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package balancer + +import ( + "math/rand" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestBalancerPutAndDelete(t *testing.T) { + t.Parallel() + re := require.New(t) + balancers := []Balancer[uint32]{ + NewRoundRobin[uint32](), + } + for _, balancer := range balancers { + re.Equal(uint32(0), balancer.Next()) + // test put + exists := make(map[uint32]struct{}) + for i := 0; i < 100; i++ { + num := rand.Uint32() + balancer.Put(num) + exists[num] = struct{}{} + re.Equal(len(balancer.GetAll()), len(exists)) + t := balancer.Next() + re.Contains(exists, t) + } + // test delete + for num := range exists { + balancer.Delete(num) + delete(exists, num) + re.Equal(len(balancer.GetAll()), len(exists)) + if len(exists) == 0 { + break + } + t := balancer.Next() + re.NotEqual(t, num) + re.Contains(exists, t) + } + re.Equal(uint32(0), balancer.Next()) + } +} + +func TestBalancerDuplicate(t *testing.T) { + t.Parallel() + re := require.New(t) + balancers := []Balancer[uint32]{ + NewRoundRobin[uint32](), + } + for _, balancer := range balancers { + re.Len(balancer.GetAll(), 0) + // test duplicate put + balancer.Put(1) + re.Len(balancer.GetAll(), 1) + balancer.Put(1) + re.Len(balancer.GetAll(), 1) + // test duplicate delete + balancer.Delete(1) + re.Len(balancer.GetAll(), 0) + balancer.Delete(1) + re.Len(balancer.GetAll(), 0) + } +} + +func TestRoundRobin(t *testing.T) { + t.Parallel() + re := require.New(t) + balancer := NewRoundRobin[uint32]() + for i := 0; i < 100; i++ { + num := rand.Uint32() + balancer.Put(num) + } + statistics := make(map[uint32]int) + for i := 0; i < 1000; i++ { + statistics[balancer.Next()]++ + } + min := 1000 + max := 0 + for _, v := range statistics { + if v < min { + min = v + } + if v > max { + max = v + } + } + re.LessOrEqual(max-min, 10) +} diff --git a/pkg/balancer/round_robin.go b/pkg/balancer/round_robin.go new file mode 100644 index 00000000000..cef35c43a5f --- /dev/null +++ b/pkg/balancer/round_robin.go @@ -0,0 +1,87 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package balancer + +import ( + "sync" + "sync/atomic" +) + +// RoundRobin is a balancer that selects nodes in a round-robin fashion. +type RoundRobin[T uint32 | string] struct { + sync.RWMutex + nodes []T + exists map[T]struct{} + next uint32 +} + +// NewRoundRobin creates a balancer that selects nodes in a round-robin fashion. +func NewRoundRobin[T uint32 | string]() *RoundRobin[T] { + return &RoundRobin[T]{ + nodes: make([]T, 0), + exists: make(map[T]struct{}), + } +} + +// Next returns next address +func (r *RoundRobin[T]) Next() (t T) { + r.RLock() + defer r.RUnlock() + if len(r.nodes) == 0 { + return + } + next := atomic.AddUint32(&r.next, 1) + node := r.nodes[(int(next)-1)%len(r.nodes)] + return node +} + +// GetAll returns all nodes +func (r *RoundRobin[T]) GetAll() []T { + r.RLock() + defer r.RUnlock() + return r.nodes +} + +// Put puts one into balancer. +func (r *RoundRobin[T]) Put(node T) { + r.Lock() + defer r.Unlock() + if _, ok := r.exists[node]; !ok { + r.nodes = append(r.nodes, node) + r.exists[node] = struct{}{} + } +} + +// Delete deletes one from balancer. +func (r *RoundRobin[T]) Delete(node T) { + r.Lock() + defer r.Unlock() + if _, ok := r.exists[node]; ok { + for i, n := range r.nodes { + if n == node { + r.nodes = append(r.nodes[:i], r.nodes[i+1:]...) + delete(r.exists, node) + break + } + } + } +} + +// Len returns the length of nodes. +func (r *RoundRobin[T]) Len() int { + r.RLock() + defer r.RUnlock() + return len(r.nodes) +} diff --git a/pkg/keyspace/keyspace.go b/pkg/keyspace/keyspace.go index 6c052ad47dd..7bd8de7bc28 100644 --- a/pkg/keyspace/keyspace.go +++ b/pkg/keyspace/keyspace.go @@ -81,8 +81,8 @@ type CreateKeyspaceRequest struct { // Using an existing name will result in error. Name string Config map[string]string - // Now is the timestamp used to record creation time. - Now int64 + // CreateTime is the timestamp used to record creation time. + CreateTime int64 } // NewKeyspaceManager creates a Manager of keyspace related data. @@ -140,9 +140,9 @@ func (manager *Manager) Bootstrap() error { return err } req := &CreateKeyspaceRequest{ - Name: keyspaceName, - Now: now, - Config: config, + Name: keyspaceName, + CreateTime: now, + Config: config, } keyspace, err := manager.CreateKeyspace(req) // Ignore the keyspaceExists error for the same reason as saving default keyspace. @@ -190,8 +190,8 @@ func (manager *Manager) CreateKeyspace(request *CreateKeyspaceRequest) (*keyspac Id: newID, Name: request.Name, State: keyspacepb.KeyspaceState_ENABLED, - CreatedAt: request.Now, - StateChangedAt: request.Now, + CreatedAt: request.CreateTime, + StateChangedAt: request.CreateTime, Config: request.Config, } err = manager.saveNewKeyspace(keyspace) diff --git a/pkg/keyspace/keyspace_test.go b/pkg/keyspace/keyspace_test.go index 8fe6fb40e29..f1ef85711fd 100644 --- a/pkg/keyspace/keyspace_test.go +++ b/pkg/keyspace/keyspace_test.go @@ -59,7 +59,7 @@ func (suite *keyspaceTestSuite) SetupTest() { suite.ctx, suite.cancel = context.WithCancel(context.Background()) store := endpoint.NewStorageEndpoint(kv.NewMemoryKV(), nil) allocator := mockid.NewIDAllocator() - kgm := NewKeyspaceGroupManager(suite.ctx, store) + kgm := NewKeyspaceGroupManager(suite.ctx, store, nil, 0) suite.manager = NewKeyspaceManager(store, nil, allocator, &mockConfig{}, kgm) suite.NoError(kgm.Bootstrap()) suite.NoError(suite.manager.Bootstrap()) @@ -82,12 +82,12 @@ func makeCreateKeyspaceRequests(count int) []*CreateKeyspaceRequest { requests := make([]*CreateKeyspaceRequest, count) for i := 0; i < count; i++ { requests[i] = &CreateKeyspaceRequest{ - Name: fmt.Sprintf("test_keyspace%d", i), + Name: fmt.Sprintf("test_keyspace_%d", i), Config: map[string]string{ testConfig1: "100", testConfig2: "200", }, - Now: now, + CreateTime: now, } } return requests @@ -312,8 +312,8 @@ func (suite *keyspaceTestSuite) TestUpdateMultipleKeyspace() { // checkCreateRequest verifies a keyspace meta matches a create request. func checkCreateRequest(re *require.Assertions, request *CreateKeyspaceRequest, meta *keyspacepb.KeyspaceMeta) { re.Equal(request.Name, meta.GetName()) - re.Equal(request.Now, meta.GetCreatedAt()) - re.Equal(request.Now, meta.GetStateChangedAt()) + re.Equal(request.CreateTime, meta.GetCreatedAt()) + re.Equal(request.CreateTime, meta.GetStateChangedAt()) re.Equal(keyspacepb.KeyspaceState_ENABLED, meta.GetState()) re.Equal(request.Config, meta.GetConfig()) } diff --git a/pkg/keyspace/tso_keyspace_group.go b/pkg/keyspace/tso_keyspace_group.go index 82a606ca205..f18b80ed305 100644 --- a/pkg/keyspace/tso_keyspace_group.go +++ b/pkg/keyspace/tso_keyspace_group.go @@ -16,14 +16,33 @@ package keyspace import ( "context" + "encoding/json" "strconv" "sync" + "time" "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/tikv/pd/pkg/balancer" + "github.com/tikv/pd/pkg/mcs/discovery" "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/slice" "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/storage/kv" + "github.com/tikv/pd/pkg/utils/etcdutil" + "github.com/tikv/pd/pkg/utils/logutil" + "go.etcd.io/etcd/clientv3" + "go.uber.org/zap" +) + +const ( + defaultBalancerPolicy = balancer.PolicyRoundRobin + allocNodeTimeout = 1 * time.Second + allocNodeInterval = 10 * time.Millisecond + // TODO: move it to etcdutil + watchEtcdChangeRetryInterval = 1 * time.Second + maxRetryTimes = 25 + retryInterval = 100 * time.Millisecond ) const ( @@ -33,26 +52,49 @@ const ( // GroupManager is the manager of keyspace group related data. type GroupManager struct { - ctx context.Context + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup // the lock for the groups sync.RWMutex // groups is the cache of keyspace group related information. // user kind -> keyspace group groups map[endpoint.UserKind]*indexedHeap + // store is the storage for keyspace group related information. store endpoint.KeyspaceGroupStorage + + client *clientv3.Client + + // tsoServiceKey is the path of TSO service in etcd. + tsoServiceKey string + // tsoServiceEndKey is the end key of TSO service in etcd. + tsoServiceEndKey string + + policy balancer.Policy + + // TODO: add user kind with different balancer + // when we ensure where the correspondence between tso node and user kind will be found + nodesBalancer balancer.Balancer[string] } // NewKeyspaceGroupManager creates a Manager of keyspace group related data. -func NewKeyspaceGroupManager(ctx context.Context, store endpoint.KeyspaceGroupStorage) *GroupManager { +func NewKeyspaceGroupManager(ctx context.Context, store endpoint.KeyspaceGroupStorage, client *clientv3.Client, clusterID uint64) *GroupManager { + ctx, cancel := context.WithCancel(ctx) + key := discovery.TSOPath(clusterID) groups := make(map[endpoint.UserKind]*indexedHeap) for i := 0; i < int(endpoint.UserKindCount); i++ { groups[endpoint.UserKind(i)] = newIndexedHeap(int(utils.MaxKeyspaceGroupCountInUse)) } return &GroupManager{ - ctx: ctx, - store: store, - groups: groups, + ctx: ctx, + cancel: cancel, + store: store, + client: client, + tsoServiceKey: key, + tsoServiceEndKey: clientv3.GetPrefixRangeEnd(key) + "/", + policy: defaultBalancerPolicy, + groups: groups, } } @@ -86,9 +128,112 @@ func (m *GroupManager) Bootstrap() error { m.groups[userKind].Put(group) } + // If the etcd client is not nil, start the watch loop. + if m.client != nil { + m.nodesBalancer = balancer.GenByPolicy[string](m.policy) + m.wg.Add(1) + go m.startWatchLoop() + } return nil } +// Close closes the manager. +func (m *GroupManager) Close() { + m.cancel() + m.wg.Wait() +} + +func (m *GroupManager) startWatchLoop() { + defer logutil.LogPanic() + defer m.wg.Done() + ctx, cancel := context.WithCancel(m.ctx) + defer cancel() + var ( + resp *clientv3.GetResponse + revision int64 + err error + ) + for i := 0; i < maxRetryTimes; i++ { + select { + case <-ctx.Done(): + return + case <-time.After(retryInterval): + } + resp, err = etcdutil.EtcdKVGet(m.client, m.tsoServiceKey, clientv3.WithRange(m.tsoServiceEndKey)) + if err == nil { + revision = resp.Header.Revision + for _, item := range resp.Kvs { + s := &discovery.ServiceRegistryEntry{} + if err := json.Unmarshal(item.Value, s); err != nil { + log.Warn("failed to unmarshal service registry entry", zap.Error(err)) + continue + } + m.nodesBalancer.Put(s.ServiceAddr) + } + break + } + log.Warn("failed to get tso service addrs from etcd and will retry", zap.Error(err)) + } + if err != nil || revision == 0 { + log.Warn("failed to get tso service addrs from etcd finally when loading", zap.Error(err)) + } + for { + select { + case <-ctx.Done(): + return + default: + } + nextRevision, err := m.watchServiceAddrs(ctx, revision) + if err != nil { + log.Error("watcher canceled unexpectedly and a new watcher will start after a while", + zap.Int64("next-revision", nextRevision), + zap.Time("retry-at", time.Now().Add(watchEtcdChangeRetryInterval)), + zap.Error(err)) + revision = nextRevision + time.Sleep(watchEtcdChangeRetryInterval) + } + } +} + +func (m *GroupManager) watchServiceAddrs(ctx context.Context, revision int64) (int64, error) { + watcher := clientv3.NewWatcher(m.client) + defer watcher.Close() + for { + WatchChan: + watchChan := watcher.Watch(ctx, m.tsoServiceKey, clientv3.WithRange(m.tsoServiceEndKey), clientv3.WithRev(revision)) + select { + case <-ctx.Done(): + return revision, nil + case wresp := <-watchChan: + if wresp.CompactRevision != 0 { + log.Warn("required revision has been compacted, the watcher will watch again with the compact revision", + zap.Int64("required-revision", revision), + zap.Int64("compact-revision", wresp.CompactRevision)) + revision = wresp.CompactRevision + goto WatchChan + } + if wresp.Err() != nil { + log.Error("watch is canceled or closed", + zap.Int64("required-revision", revision), + zap.Error(wresp.Err())) + return revision, wresp.Err() + } + for _, event := range wresp.Events { + s := &discovery.ServiceRegistryEntry{} + if err := json.Unmarshal(event.Kv.Value, s); err != nil { + log.Warn("failed to unmarshal service registry entry", zap.Error(err)) + } + switch event.Type { + case clientv3.EventTypePut: + m.nodesBalancer.Put(s.ServiceAddr) + case clientv3.EventTypeDelete: + m.nodesBalancer.Delete(s.ServiceAddr) + } + } + } + } +} + // CreateKeyspaceGroups creates keyspace groups. func (m *GroupManager) CreateKeyspaceGroups(keyspaceGroups []*endpoint.KeyspaceGroup) error { m.Lock() @@ -327,7 +472,7 @@ func (m *GroupManager) SplitKeyspaceGroupByID(splitSourceID, splitTargetID uint3 return err } if splitSourceKg == nil { - return ErrKeyspaceGroupNotFound + return ErrKeyspaceGroupNotExists } // A keyspace group can not take part in multiple split processes. if splitSourceKg.IsSplitting() { @@ -406,7 +551,7 @@ func (m *GroupManager) FinishSplitKeyspaceByID(splitTargetID uint32) error { return err } if splitTargetKg == nil { - return ErrKeyspaceGroupNotFound + return ErrKeyspaceGroupNotExists } // Check if it's in the split state. if !splitTargetKg.IsSplitTarget() { @@ -418,7 +563,7 @@ func (m *GroupManager) FinishSplitKeyspaceByID(splitTargetID uint32) error { return err } if splitSourceKg == nil { - return ErrKeyspaceGroupNotFound + return ErrKeyspaceGroupNotExists } if !splitSourceKg.IsSplitSource() { return ErrKeyspaceGroupNotInSplit @@ -442,3 +587,57 @@ func (m *GroupManager) FinishSplitKeyspaceByID(splitTargetID uint32) error { m.groups[endpoint.StringUserKind(splitSourceKg.UserKind)].Put(splitSourceKg) return nil } + +// GetNodesNum returns the number of nodes. +func (m *GroupManager) GetNodesNum() int { + return m.nodesBalancer.Len() +} + +// AllocNodesForKeyspaceGroup allocates nodes for the keyspace group. +func (m *GroupManager) AllocNodesForKeyspaceGroup(id uint32, replica int) ([]endpoint.KeyspaceGroupMember, error) { + ctx, cancel := context.WithTimeout(m.ctx, allocNodeTimeout) + defer cancel() + ticker := time.NewTicker(allocNodeInterval) + defer ticker.Stop() + nodes := make([]endpoint.KeyspaceGroupMember, 0, replica) + err := m.store.RunInTxn(m.ctx, func(txn kv.Txn) error { + kg, err := m.store.LoadKeyspaceGroup(txn, id) + if err != nil { + return err + } + if kg == nil { + return ErrKeyspaceGroupNotExists + } + exists := make(map[string]struct{}) + for _, member := range kg.Members { + exists[member.Address] = struct{}{} + nodes = append(nodes, member) + } + for len(exists) < replica { + select { + case <-ctx.Done(): + return nil + case <-ticker.C: + } + num := m.GetNodesNum() + if num < replica || num == 0 { // double check + return ErrNoAvailableNode + } + addr := m.nodesBalancer.Next() + if addr == "" { + return ErrNoAvailableNode + } + if _, ok := exists[addr]; ok { + continue + } + exists[addr] = struct{}{} + nodes = append(nodes, endpoint.KeyspaceGroupMember{Address: addr}) + } + kg.Members = nodes + return m.store.SaveKeyspaceGroup(txn, kg) + }) + if err != nil { + return nil, err + } + return nodes, nil +} diff --git a/pkg/keyspace/tso_keyspace_group_test.go b/pkg/keyspace/tso_keyspace_group_test.go index 80f4d713c35..ed7992ba552 100644 --- a/pkg/keyspace/tso_keyspace_group_test.go +++ b/pkg/keyspace/tso_keyspace_group_test.go @@ -43,7 +43,7 @@ func TestKeyspaceGroupTestSuite(t *testing.T) { func (suite *keyspaceGroupTestSuite) SetupTest() { suite.ctx, suite.cancel = context.WithCancel(context.Background()) store := endpoint.NewStorageEndpoint(kv.NewMemoryKV(), nil) - suite.kgm = NewKeyspaceGroupManager(suite.ctx, store) + suite.kgm = NewKeyspaceGroupManager(suite.ctx, store, nil, 0) idAllocator := mockid.NewIDAllocator() cluster := mockcluster.NewCluster(suite.ctx, mockconfig.NewTestOptions()) suite.kg = NewKeyspaceManager(store, cluster, idAllocator, &mockConfig{}, suite.kgm) @@ -137,7 +137,7 @@ func (suite *keyspaceGroupTestSuite) TestKeyspaceAssignment() { Config: map[string]string{ UserKindKey: endpoint.Standard.String(), }, - Now: time.Now().Unix(), + CreateTime: time.Now().Unix(), }) re.NoError(err) } @@ -180,7 +180,7 @@ func (suite *keyspaceGroupTestSuite) TestUpdateKeyspace() { Config: map[string]string{ UserKindKey: endpoint.Standard.String(), }, - Now: time.Now().Unix(), + CreateTime: time.Now().Unix(), }) re.NoError(err) kg2, err := suite.kgm.GetKeyspaceGroupByID(2) @@ -269,7 +269,7 @@ func (suite *keyspaceGroupTestSuite) TestKeyspaceGroupSplit() { re.ErrorIs(err, ErrKeyspaceGroupNotInSplit) // finish the split of a non-existing keyspace group err = suite.kgm.FinishSplitKeyspaceByID(5) - re.ErrorIs(err, ErrKeyspaceGroupNotFound) + re.ErrorIs(err, ErrKeyspaceGroupNotExists) // split the in-split keyspace group err = suite.kgm.SplitKeyspaceGroupByID(2, 4, []uint32{333}) re.ErrorIs(err, ErrKeyspaceGroupInSplit) @@ -304,7 +304,7 @@ func (suite *keyspaceGroupTestSuite) TestKeyspaceGroupSplit() { // split a non-existing keyspace group err = suite.kgm.SplitKeyspaceGroupByID(3, 5, nil) - re.ErrorIs(err, ErrKeyspaceGroupNotFound) + re.ErrorIs(err, ErrKeyspaceGroupNotExists) // split into an existing keyspace group err = suite.kgm.SplitKeyspaceGroupByID(2, 4, nil) re.ErrorIs(err, ErrKeyspaceGroupExists) diff --git a/pkg/keyspace/util.go b/pkg/keyspace/util.go index 3eaf67d7b24..9b89a1e569e 100644 --- a/pkg/keyspace/util.go +++ b/pkg/keyspace/util.go @@ -43,16 +43,18 @@ var ( ErrKeyspaceExists = errors.New("keyspace already exists") // ErrKeyspaceGroupExists indicates target keyspace group already exists. ErrKeyspaceGroupExists = errors.New("keyspace group already exists") - // ErrKeyspaceGroupNotFound is used to indicate target keyspace group does not exist. - ErrKeyspaceGroupNotFound = errors.New("keyspace group does not exist") + // ErrKeyspaceGroupNotExists is used to indicate target keyspace group does not exist. + ErrKeyspaceGroupNotExists = errors.New("keyspace group does not exist") // ErrKeyspaceGroupInSplit is used to indicate target keyspace group is in split state. ErrKeyspaceGroupInSplit = errors.New("keyspace group is in split state") // ErrKeyspaceGroupNotInSplit is used to indicate target keyspace group is not in split state. ErrKeyspaceGroupNotInSplit = errors.New("keyspace group is not in split state") // ErrKeyspaceNotInKeyspaceGroup is used to indicate target keyspace is not in this keyspace group. ErrKeyspaceNotInKeyspaceGroup = errors.New("keyspace is not in this keyspace group") - errModifyDefault = errors.New("cannot modify default keyspace's state") - errIllegalOperation = errors.New("unknown operation") + // ErrNoAvailableNode is used to indicate no available node in the keyspace group. + ErrNoAvailableNode = errors.New("no available node") + errModifyDefault = errors.New("cannot modify default keyspace's state") + errIllegalOperation = errors.New("unknown operation") // stateTransitionTable lists all allowed next state for the given current state. // Note that transit from any state to itself is allowed for idempotence. diff --git a/pkg/mcs/discovery/key_path.go b/pkg/mcs/discovery/key_path.go index 575b80f80da..0e53b21c9fe 100644 --- a/pkg/mcs/discovery/key_path.go +++ b/pkg/mcs/discovery/key_path.go @@ -14,7 +14,10 @@ package discovery -import "strings" +import ( + "strconv" + "strings" +) const ( registryPrefix = "/ms" @@ -28,3 +31,8 @@ func registryPath(clusterID, serviceName, serviceAddr string) string { func discoveryPath(clusterID, serviceName string) string { return strings.Join([]string{registryPrefix, clusterID, serviceName, registryKey}, "/") } + +// TSOPath returns the path to store TSO addresses. +func TSOPath(clusterID uint64) string { + return discoveryPath(strconv.FormatUint(clusterID, 10), "tso") + "/" +} diff --git a/pkg/storage/endpoint/tso_keyspace_group.go b/pkg/storage/endpoint/tso_keyspace_group.go index 7d63321230f..f63922ea64b 100644 --- a/pkg/storage/endpoint/tso_keyspace_group.go +++ b/pkg/storage/endpoint/tso_keyspace_group.go @@ -60,6 +60,16 @@ func (k UserKind) String() string { return "unknown UserKind" } +// IsUserKindValid checks if the user kind is valid. +func IsUserKindValid(kind string) bool { + switch kind { + case Basic.String(), Standard.String(), Enterprise.String(): + return true + default: + return false + } +} + // KeyspaceGroupMember defines an election member which campaigns for the primary of the keyspace group. type KeyspaceGroupMember struct { Address string `json:"address"` diff --git a/server/apiv2/handlers/keyspace.go b/server/apiv2/handlers/keyspace.go index 2ac5235831b..8aa9bdbc2bf 100644 --- a/server/apiv2/handlers/keyspace.go +++ b/server/apiv2/handlers/keyspace.go @@ -69,9 +69,9 @@ func CreateKeyspace(c *gin.Context) { return } req := &keyspace.CreateKeyspaceRequest{ - Name: createParams.Name, - Config: createParams.Config, - Now: time.Now().Unix(), + Name: createParams.Name, + Config: createParams.Config, + CreateTime: time.Now().Unix(), } meta, err := manager.CreateKeyspace(req) if err != nil { diff --git a/server/apiv2/handlers/tso_keyspace_group.go b/server/apiv2/handlers/tso_keyspace_group.go index 89bc6a8e5fc..8db553e765a 100644 --- a/server/apiv2/handlers/tso_keyspace_group.go +++ b/server/apiv2/handlers/tso_keyspace_group.go @@ -35,6 +35,7 @@ func RegisterTSOKeyspaceGroup(r *gin.RouterGroup) { router.GET("", GetKeyspaceGroups) router.GET("/:id", GetKeyspaceGroupByID) router.DELETE("/:id", DeleteKeyspaceGroupByID) + router.POST("/:id/alloc", AllocNodeForKeyspaceGroup) router.POST("/:id/split", SplitKeyspaceGroupByID) router.DELETE("/:id/split", FinishSplitKeyspaceByID) } @@ -57,6 +58,12 @@ func CreateKeyspaceGroups(c *gin.Context) { c.AbortWithStatusJSON(http.StatusBadRequest, "invalid keyspace group id") return } + if keyspaceGroup.UserKind == "" { + keyspaceGroup.UserKind = endpoint.Basic.String() + } else if !endpoint.IsUserKindValid(keyspaceGroup.UserKind) { + c.AbortWithStatusJSON(http.StatusBadRequest, "invalid user kind") + return + } } svr := c.MustGet(middlewares.ServerContextKey).(*server.Server) @@ -183,6 +190,48 @@ func FinishSplitKeyspaceByID(c *gin.Context) { c.JSON(http.StatusOK, nil) } +// AllocNodeForKeyspaceGroupParams defines the params for allocating nodes for keyspace groups. +type AllocNodeForKeyspaceGroupParams struct { + Replica int `json:"replica"` +} + +// AllocNodeForKeyspaceGroup allocates nodes for keyspace group. +func AllocNodeForKeyspaceGroup(c *gin.Context) { + id, err := validateKeyspaceGroupID(c) + if err != nil { + c.AbortWithStatusJSON(http.StatusBadRequest, "invalid keyspace group id") + return + } + svr := c.MustGet(middlewares.ServerContextKey).(*server.Server) + manager := svr.GetKeyspaceGroupManager() + allocParams := &AllocNodeForKeyspaceGroupParams{} + err = c.BindJSON(allocParams) + if err != nil { + c.AbortWithStatusJSON(http.StatusBadRequest, errs.ErrBindJSON.Wrap(err).GenWithStackByCause()) + return + } + if manager.GetNodesNum() < allocParams.Replica || allocParams.Replica < 1 { + c.AbortWithStatusJSON(http.StatusBadRequest, "invalid replica, should be in [1, nodes_num]") + return + } + keyspaceGroup, err := manager.GetKeyspaceGroupByID(id) + if err != nil || keyspaceGroup == nil { + c.AbortWithStatusJSON(http.StatusBadRequest, "keyspace group does not exist") + return + } + if len(keyspaceGroup.Members) >= allocParams.Replica { + c.AbortWithStatusJSON(http.StatusBadRequest, "existed replica is larger than the new replica") + return + } + // get the nodes + nodes, err := manager.AllocNodesForKeyspaceGroup(id, allocParams.Replica) + if err != nil { + c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error()) + return + } + c.JSON(http.StatusOK, nodes) +} + func validateKeyspaceGroupID(c *gin.Context) (uint32, error) { id, err := strconv.ParseUint(c.Param("id"), 10, 64) if err != nil { diff --git a/server/server.go b/server/server.go index 73732d26315..44c6d265e2f 100644 --- a/server/server.go +++ b/server/server.go @@ -441,7 +441,7 @@ func (s *Server) startServer(ctx context.Context) error { Step: keyspace.AllocStep, }) if s.IsAPIServiceMode() { - s.keyspaceGroupManager = keyspace.NewKeyspaceGroupManager(s.ctx, s.storage) + s.keyspaceGroupManager = keyspace.NewKeyspaceGroupManager(s.ctx, s.storage, s.client, s.clusterID) } s.keyspaceManager = keyspace.NewKeyspaceManager(s.storage, s.cluster, keyspaceIDAllocator, &s.cfg.Keyspace, s.keyspaceGroupManager) s.hbStreams = hbstream.NewHeartbeatStreams(ctx, s.clusterID, s.cluster) @@ -478,6 +478,9 @@ func (s *Server) Close() { log.Info("closing server") s.stopServerLoop() + if s.IsAPIServiceMode() { + s.keyspaceGroupManager.Close() + } if s.client != nil { if err := s.client.Close(); err != nil { diff --git a/tests/integrations/client/keyspace_test.go b/tests/integrations/client/keyspace_test.go index 7cb35820bc6..cb3adfd4d2e 100644 --- a/tests/integrations/client/keyspace_test.go +++ b/tests/integrations/client/keyspace_test.go @@ -37,12 +37,12 @@ func mustMakeTestKeyspaces(re *require.Assertions, server *server.Server, start, manager := server.GetKeyspaceManager() for i := 0; i < count; i++ { keyspaces[i], err = manager.CreateKeyspace(&keyspace.CreateKeyspaceRequest{ - Name: fmt.Sprintf("test_keyspace%d", start+i), + Name: fmt.Sprintf("test_keyspace_%d", start+i), Config: map[string]string{ testConfig1: "100", testConfig2: "200", }, - Now: now, + CreateTime: now, }) re.NoError(err) } @@ -120,9 +120,9 @@ func (suite *clientTestSuite) TestWatchKeyspaces() { func mustCreateKeyspaceAtState(re *require.Assertions, server *server.Server, index int, state keyspacepb.KeyspaceState) *keyspacepb.KeyspaceMeta { manager := server.GetKeyspaceManager() meta, err := manager.CreateKeyspace(&keyspace.CreateKeyspaceRequest{ - Name: fmt.Sprintf("test_keyspace%d", index), - Config: nil, - Now: 0, // Use 0 to indicate unchanged keyspace. + Name: fmt.Sprintf("test_keyspace_%d", index), + Config: nil, + CreateTime: 0, // Use 0 to indicate unchanged keyspace. }) re.NoError(err) switch state { diff --git a/tests/integrations/mcs/keyspace/tso_keyspace_group_test.go b/tests/integrations/mcs/keyspace/tso_keyspace_group_test.go new file mode 100644 index 00000000000..7b0c09c2a7b --- /dev/null +++ b/tests/integrations/mcs/keyspace/tso_keyspace_group_test.go @@ -0,0 +1,245 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package keyspace_test + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "testing" + + "github.com/stretchr/testify/suite" + bs "github.com/tikv/pd/pkg/basicserver" + "github.com/tikv/pd/pkg/storage/endpoint" + "github.com/tikv/pd/pkg/utils/tempurl" + "github.com/tikv/pd/pkg/utils/testutil" + "github.com/tikv/pd/server/apiv2/handlers" + "github.com/tikv/pd/tests" + "github.com/tikv/pd/tests/integrations/mcs" +) + +const ( + keyspaceGroupsPrefix = "/pd/api/v2/tso/keyspace-groups" +) + +type keyspaceGroupTestSuite struct { + suite.Suite + ctx context.Context + cleanupFunc testutil.CleanupFunc + cluster *tests.TestCluster + server *tests.TestServer + backendEndpoints string + dialClient *http.Client +} + +func TestKeyspaceGroupTestSuite(t *testing.T) { + suite.Run(t, new(keyspaceGroupTestSuite)) +} + +func (suite *keyspaceGroupTestSuite) SetupTest() { + ctx, cancel := context.WithCancel(context.Background()) + suite.ctx = ctx + cluster, err := tests.NewTestAPICluster(suite.ctx, 1) + suite.cluster = cluster + suite.NoError(err) + suite.NoError(cluster.RunInitialServers()) + suite.NotEmpty(cluster.WaitLeader()) + suite.server = cluster.GetServer(cluster.GetLeader()) + suite.NoError(suite.server.BootstrapCluster()) + suite.backendEndpoints = suite.server.GetAddr() + suite.dialClient = &http.Client{ + Transport: &http.Transport{ + DisableKeepAlives: true, + }, + } + suite.cleanupFunc = func() { + cancel() + } +} + +func (suite *keyspaceGroupTestSuite) TearDownTest() { + suite.cleanupFunc() + suite.cluster.Destroy() +} + +func (suite *keyspaceGroupTestSuite) TestAllocNodesUpdate() { + // add three nodes. + nodes := make(map[string]bs.Server) + for i := 0; i < 3; i++ { + s, cleanup := mcs.StartSingleTSOTestServer(suite.ctx, suite.Require(), suite.backendEndpoints, tempurl.Alloc()) + defer cleanup() + nodes[s.GetAddr()] = s + } + mcs.WaitForPrimaryServing(suite.Require(), nodes) + + // create a keyspace group. + kgs := &handlers.CreateKeyspaceGroupParams{KeyspaceGroups: []*endpoint.KeyspaceGroup{ + { + ID: uint32(1), + UserKind: endpoint.Standard.String(), + }, + }} + code := suite.tryCreateKeyspaceGroup(kgs) + suite.Equal(http.StatusOK, code) + + // alloc nodes for the keyspace group. + id := 1 + params := &handlers.AllocNodeForKeyspaceGroupParams{ + Replica: 1, + } + code, got := suite.tryAllocNodesForKeyspaceGroup(id, params) + suite.Equal(http.StatusOK, code) + suite.Equal(1, len(got)) + suite.Contains(nodes, got[0].Address) + oldNode := got[0].Address + + // alloc node update to 2. + params.Replica = 2 + code, got = suite.tryAllocNodesForKeyspaceGroup(id, params) + suite.Equal(http.StatusOK, code) + suite.Equal(2, len(got)) + suite.Contains(nodes, got[0].Address) + suite.Contains(nodes, got[1].Address) + suite.True(oldNode == got[0].Address || oldNode == got[1].Address) // the old node is also in the new result. + suite.NotEqual(got[0].Address, got[1].Address) // the two nodes are different. +} + +func (suite *keyspaceGroupTestSuite) TestReplica() { + nodes := make(map[string]bs.Server) + s, cleanup := mcs.StartSingleTSOTestServer(suite.ctx, suite.Require(), suite.backendEndpoints, tempurl.Alloc()) + defer cleanup() + nodes[s.GetAddr()] = s + mcs.WaitForPrimaryServing(suite.Require(), nodes) + + // miss replica. + id := 1 + params := &handlers.AllocNodeForKeyspaceGroupParams{} + code, got := suite.tryAllocNodesForKeyspaceGroup(id, params) + suite.Equal(http.StatusBadRequest, code) + suite.Empty(got) + + // replica is negative. + params = &handlers.AllocNodeForKeyspaceGroupParams{ + Replica: -1, + } + code, _ = suite.tryAllocNodesForKeyspaceGroup(id, params) + suite.Equal(http.StatusBadRequest, code) + + // there is no any keyspace group. + params = &handlers.AllocNodeForKeyspaceGroupParams{ + Replica: 1, + } + code, _ = suite.tryAllocNodesForKeyspaceGroup(id, params) + suite.Equal(http.StatusBadRequest, code) + + // the keyspace group is exist. + kgs := &handlers.CreateKeyspaceGroupParams{KeyspaceGroups: []*endpoint.KeyspaceGroup{ + { + ID: uint32(id), + UserKind: endpoint.Standard.String(), + }, + }} + code = suite.tryCreateKeyspaceGroup(kgs) + suite.Equal(http.StatusOK, code) + params = &handlers.AllocNodeForKeyspaceGroupParams{ + Replica: 1, + } + code, got = suite.tryAllocNodesForKeyspaceGroup(id, params) + suite.Equal(http.StatusOK, code) + suite.True(checkNodes(got, nodes)) + + // the keyspace group is exist, but the replica is more than the num of nodes. + params = &handlers.AllocNodeForKeyspaceGroupParams{ + Replica: 2, + } + code, _ = suite.tryAllocNodesForKeyspaceGroup(id, params) + suite.Equal(http.StatusBadRequest, code) + // the keyspace group is exist, the new replica is more than the old replica. + s2, cleanup2 := mcs.StartSingleTSOTestServer(suite.ctx, suite.Require(), suite.backendEndpoints, tempurl.Alloc()) + defer cleanup2() + nodes[s2.GetAddr()] = s2 + mcs.WaitForPrimaryServing(suite.Require(), nodes) + params = &handlers.AllocNodeForKeyspaceGroupParams{ + Replica: 2, + } + code, got = suite.tryAllocNodesForKeyspaceGroup(id, params) + suite.Equal(http.StatusOK, code) + suite.True(checkNodes(got, nodes)) + + // the keyspace group is exist, the new replica is equal to the old replica. + params = &handlers.AllocNodeForKeyspaceGroupParams{ + Replica: 2, + } + code, _ = suite.tryAllocNodesForKeyspaceGroup(id, params) + suite.Equal(http.StatusBadRequest, code) + + // the keyspace group is exist, the new replica is less than the old replica. + params = &handlers.AllocNodeForKeyspaceGroupParams{ + Replica: 1, + } + code, _ = suite.tryAllocNodesForKeyspaceGroup(id, params) + suite.Equal(http.StatusBadRequest, code) + + // the keyspace group is not exist. + id = 2 + params = &handlers.AllocNodeForKeyspaceGroupParams{ + Replica: 1, + } + code, _ = suite.tryAllocNodesForKeyspaceGroup(id, params) + suite.Equal(http.StatusBadRequest, code) +} + +func (suite *keyspaceGroupTestSuite) tryAllocNodesForKeyspaceGroup(id int, request *handlers.AllocNodeForKeyspaceGroupParams) (int, []endpoint.KeyspaceGroupMember) { + data, err := json.Marshal(request) + suite.NoError(err) + httpReq, err := http.NewRequest(http.MethodPost, suite.server.GetAddr()+keyspaceGroupsPrefix+fmt.Sprintf("/%d/alloc", id), bytes.NewBuffer(data)) + suite.NoError(err) + resp, err := suite.dialClient.Do(httpReq) + suite.NoError(err) + defer resp.Body.Close() + nodes := make([]endpoint.KeyspaceGroupMember, 0) + if resp.StatusCode == http.StatusOK { + bodyBytes, err := io.ReadAll(resp.Body) + suite.NoError(err) + suite.NoError(json.Unmarshal(bodyBytes, &nodes)) + } + return resp.StatusCode, nodes +} + +func (suite *keyspaceGroupTestSuite) tryCreateKeyspaceGroup(request *handlers.CreateKeyspaceGroupParams) int { + data, err := json.Marshal(request) + suite.NoError(err) + httpReq, err := http.NewRequest(http.MethodPost, suite.server.GetAddr()+keyspaceGroupsPrefix, bytes.NewBuffer(data)) + suite.NoError(err) + resp, err := suite.dialClient.Do(httpReq) + suite.NoError(err) + defer resp.Body.Close() + return resp.StatusCode +} + +func checkNodes(nodes []endpoint.KeyspaceGroupMember, servers map[string]bs.Server) bool { + if len(nodes) != len(servers) { + return false + } + for _, node := range nodes { + if _, ok := servers[node.Address]; !ok { + return false + } + } + return true +} diff --git a/tests/server/apiv2/handlers/keyspace_test.go b/tests/server/apiv2/handlers/keyspace_test.go index 9235160c6cb..f976208f65b 100644 --- a/tests/server/apiv2/handlers/keyspace_test.go +++ b/tests/server/apiv2/handlers/keyspace_test.go @@ -146,7 +146,7 @@ func mustMakeTestKeyspaces(re *require.Assertions, server *tests.TestServer, cou resultMeta := make([]*keyspacepb.KeyspaceMeta, count) for i := 0; i < count; i++ { createRequest := &handlers.CreateKeyspaceParams{ - Name: fmt.Sprintf("test_keyspace%d", i), + Name: fmt.Sprintf("test_keyspace_%d", i), Config: testConfig, } resultMeta[i] = mustCreateKeyspace(re, server, createRequest) diff --git a/tests/server/apiv2/handlers/testutil.go b/tests/server/apiv2/handlers/testutil.go index 6aee761eedb..53e1cb945db 100644 --- a/tests/server/apiv2/handlers/testutil.go +++ b/tests/server/apiv2/handlers/testutil.go @@ -152,6 +152,17 @@ func sendLoadKeyspaceGroupRequest(re *require.Assertions, server *tests.TestServ return resp } +func tryCreateKeyspaceGroup(re *require.Assertions, server *tests.TestServer, request *handlers.CreateKeyspaceGroupParams) int { + data, err := json.Marshal(request) + re.NoError(err) + httpReq, err := http.NewRequest(http.MethodPost, server.GetAddr()+keyspaceGroupsPrefix, bytes.NewBuffer(data)) + re.NoError(err) + resp, err := dialClient.Do(httpReq) + re.NoError(err) + defer resp.Body.Close() + return resp.StatusCode +} + // MustLoadKeyspaceGroupByID loads the keyspace group by ID with HTTP API. func MustLoadKeyspaceGroupByID(re *require.Assertions, server *tests.TestServer, id uint32) *endpoint.KeyspaceGroup { httpReq, err := http.NewRequest(http.MethodGet, server.GetAddr()+keyspaceGroupsPrefix+fmt.Sprintf("/%d", id), nil) @@ -169,14 +180,14 @@ func MustLoadKeyspaceGroupByID(re *require.Assertions, server *tests.TestServer, // MustCreateKeyspaceGroup creates a keyspace group with HTTP API. func MustCreateKeyspaceGroup(re *require.Assertions, server *tests.TestServer, request *handlers.CreateKeyspaceGroupParams) { - data, err := json.Marshal(request) - re.NoError(err) - httpReq, err := http.NewRequest(http.MethodPost, server.GetAddr()+keyspaceGroupsPrefix, bytes.NewBuffer(data)) - re.NoError(err) - resp, err := dialClient.Do(httpReq) - re.NoError(err) - defer resp.Body.Close() - re.Equal(http.StatusOK, resp.StatusCode) + code := tryCreateKeyspaceGroup(re, server, request) + re.Equal(http.StatusOK, code) +} + +// FailCreateKeyspaceGroupWithCode fails to create a keyspace group with HTTP API. +func FailCreateKeyspaceGroupWithCode(re *require.Assertions, server *tests.TestServer, request *handlers.CreateKeyspaceGroupParams, expect int) { + code := tryCreateKeyspaceGroup(re, server, request) + re.Equal(expect, code) } // MustSplitKeyspaceGroup updates a keyspace group with HTTP API. diff --git a/tests/server/apiv2/handlers/tso_keyspace_group_test.go b/tests/server/apiv2/handlers/tso_keyspace_group_test.go index bdc89428669..4a79eff7fe6 100644 --- a/tests/server/apiv2/handlers/tso_keyspace_group_test.go +++ b/tests/server/apiv2/handlers/tso_keyspace_group_test.go @@ -16,9 +16,11 @@ package handlers import ( "context" + "net/http" "testing" "github.com/stretchr/testify/suite" + "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/server/apiv2/handlers" "github.com/tikv/pd/tests" @@ -64,8 +66,50 @@ func (suite *keyspaceGroupTestSuite) TestCreateKeyspaceGroups() { UserKind: endpoint.Standard.String(), }, }} + MustCreateKeyspaceGroup(re, suite.server, kgs) + // miss user kind, use default value. + kgs = &handlers.CreateKeyspaceGroupParams{KeyspaceGroups: []*endpoint.KeyspaceGroup{ + { + ID: uint32(3), + }, + }} MustCreateKeyspaceGroup(re, suite.server, kgs) + + // invalid user kind. + kgs = &handlers.CreateKeyspaceGroupParams{KeyspaceGroups: []*endpoint.KeyspaceGroup{ + { + ID: uint32(4), + UserKind: "invalid", + }, + }} + FailCreateKeyspaceGroupWithCode(re, suite.server, kgs, http.StatusBadRequest) + + // miss ID. + kgs = &handlers.CreateKeyspaceGroupParams{KeyspaceGroups: []*endpoint.KeyspaceGroup{ + { + UserKind: endpoint.Standard.String(), + }, + }} + FailCreateKeyspaceGroupWithCode(re, suite.server, kgs, http.StatusInternalServerError) + + // invalid ID. + kgs = &handlers.CreateKeyspaceGroupParams{KeyspaceGroups: []*endpoint.KeyspaceGroup{ + { + ID: utils.MaxKeyspaceGroupCount + 1, + UserKind: endpoint.Standard.String(), + }, + }} + FailCreateKeyspaceGroupWithCode(re, suite.server, kgs, http.StatusBadRequest) + + // repeated ID. + kgs = &handlers.CreateKeyspaceGroupParams{KeyspaceGroups: []*endpoint.KeyspaceGroup{ + { + ID: uint32(2), + UserKind: endpoint.Standard.String(), + }, + }} + FailCreateKeyspaceGroupWithCode(re, suite.server, kgs, http.StatusInternalServerError) } func (suite *keyspaceGroupTestSuite) TestLoadKeyspaceGroup() { diff --git a/tests/server/keyspace/keyspace_test.go b/tests/server/keyspace/keyspace_test.go index e108879f3c4..5f45030f87e 100644 --- a/tests/server/keyspace/keyspace_test.go +++ b/tests/server/keyspace/keyspace_test.go @@ -81,8 +81,8 @@ func (suite *keyspaceTestSuite) TestRegionLabeler() { var err error for i := 0; i < count; i++ { keyspaces[i], err = manager.CreateKeyspace(&keyspace.CreateKeyspaceRequest{ - Name: fmt.Sprintf("test_keyspace%d", i), - Now: now, + Name: fmt.Sprintf("test_keyspace_%d", i), + CreateTime: now, }) re.NoError(err) }