Skip to content

Commit

Permalink
Merge branch 'master' into fix424
Browse files Browse the repository at this point in the history
  • Loading branch information
cofyc authored Jun 11, 2019
2 parents e08f70d + 29a5fa3 commit 6a727e4
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 8 deletions.
33 changes: 31 additions & 2 deletions pkg/controller/service_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
v1listers "github.com/pingcap/tidb-operator/pkg/client/listers/pingcap.com/v1alpha1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -122,6 +123,7 @@ var _ ServiceControlInterface = &realServiceControl{}
type FakeServiceControl struct {
SvcLister corelisters.ServiceLister
SvcIndexer cache.Indexer
EpsIndexer cache.Indexer
TcLister v1listers.TidbClusterLister
TcIndexer cache.Indexer
createServiceTracker requestTracker
Expand All @@ -130,10 +132,11 @@ type FakeServiceControl struct {
}

// NewFakeServiceControl returns a FakeServiceControl
func NewFakeServiceControl(svcInformer coreinformers.ServiceInformer, tcInformer tcinformers.TidbClusterInformer) *FakeServiceControl {
func NewFakeServiceControl(svcInformer coreinformers.ServiceInformer, epsInformer coreinformers.EndpointsInformer, tcInformer tcinformers.TidbClusterInformer) *FakeServiceControl {
return &FakeServiceControl{
svcInformer.Lister(),
svcInformer.Informer().GetIndexer(),
epsInformer.Informer().GetIndexer(),
tcInformer.Lister(),
tcInformer.Informer().GetIndexer(),
requestTracker{0, nil, 0},
Expand Down Expand Up @@ -168,7 +171,21 @@ func (ssc *FakeServiceControl) CreateService(_ *v1alpha1.TidbCluster, svc *corev
return ssc.createServiceTracker.err
}

return ssc.SvcIndexer.Add(svc)
err := ssc.SvcIndexer.Add(svc)
if err != nil {
return err
}
// add a new endpoint to indexer if svc has selector
if svc.Spec.Selector != nil {
eps := &corev1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: svc.Name,
Namespace: svc.Namespace,
},
}
return ssc.EpsIndexer.Add(eps)
}
return nil
}

// UpdateService updates the service of SvcIndexer
Expand All @@ -179,6 +196,18 @@ func (ssc *FakeServiceControl) UpdateService(_ *v1alpha1.TidbCluster, svc *corev
return nil, ssc.updateServiceTracker.err
}

if svc.Spec.Selector != nil {
eps := &corev1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: svc.Name,
Namespace: svc.Namespace,
},
}
err := ssc.EpsIndexer.Update(eps)
if err != nil {
return nil, err
}
}
return svc, ssc.SvcIndexer.Update(svc)
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/controller/tidbcluster/tidb_cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ func NewController(
tcInformer := informerFactory.Pingcap().V1alpha1().TidbClusters()
setInformer := kubeInformerFactory.Apps().V1beta1().StatefulSets()
svcInformer := kubeInformerFactory.Core().V1().Services()
epsInformer := kubeInformerFactory.Core().V1().Endpoints()
pvcInformer := kubeInformerFactory.Core().V1().PersistentVolumeClaims()
pvInformer := kubeInformerFactory.Core().V1().PersistentVolumes()
podInformer := kubeInformerFactory.Core().V1().Pods()
Expand Down Expand Up @@ -119,6 +120,7 @@ func NewController(
setInformer.Lister(),
svcInformer.Lister(),
podInformer.Lister(),
epsInformer.Lister(),
podControl,
pvcInformer.Lister(),
pdScaler,
Expand Down
2 changes: 2 additions & 0 deletions pkg/controller/tidbcluster/tidb_cluster_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ func newFakeTidbClusterController() (*Controller, cache.Indexer, cache.Indexer)
tcInformer := informerFactory.Pingcap().V1alpha1().TidbClusters()
podInformer := kubeInformerFactory.Core().V1().Pods()
nodeInformer := kubeInformerFactory.Core().V1().Nodes()
epsInformer := kubeInformerFactory.Core().V1().Endpoints()
autoFailover := true

tcc := NewController(
Expand Down Expand Up @@ -272,6 +273,7 @@ func newFakeTidbClusterController() (*Controller, cache.Indexer, cache.Indexer)
setInformer.Lister(),
svcInformer.Lister(),
podInformer.Lister(),
epsInformer.Lister(),
podControl,
pvcInformer.Lister(),
pdScaler,
Expand Down
18 changes: 15 additions & 3 deletions pkg/manager/member/pd_member_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type pdMemberManager struct {
setLister v1beta1.StatefulSetLister
svcLister corelisters.ServiceLister
podLister corelisters.PodLister
epsLister corelisters.EndpointsLister
podControl controller.PodControlInterface
pvcLister corelisters.PersistentVolumeClaimLister
pdScaler Scaler
Expand All @@ -55,6 +56,7 @@ func NewPDMemberManager(pdControl controller.PDControlInterface,
setLister v1beta1.StatefulSetLister,
svcLister corelisters.ServiceLister,
podLister corelisters.PodLister,
epsLister corelisters.EndpointsLister,
podControl controller.PodControlInterface,
pvcLister corelisters.PersistentVolumeClaimLister,
pdScaler Scaler,
Expand All @@ -68,6 +70,7 @@ func NewPDMemberManager(pdControl controller.PDControlInterface,
setLister,
svcLister,
podLister,
epsLister,
podControl,
pvcLister,
pdScaler,
Expand Down Expand Up @@ -258,18 +261,27 @@ func (pmm *pdMemberManager) syncTidbClusterStatus(tc *v1alpha1.TidbCluster, set

pdClient := pmm.pdControl.GetPDClient(tc)

cluster, err := pdClient.GetCluster()
healthInfo, err := pdClient.GetHealth()
if err != nil {
tc.Status.PD.Synced = false
// get endpoints info
eps, epErr := pmm.epsLister.Endpoints(ns).Get(controller.PDMemberName(tcName))
if epErr != nil {
return fmt.Errorf("%s, %s", err, epErr)
}
// pd service has no endpoints
if eps != nil && len(eps.Subsets) == 0 {
return fmt.Errorf("%s, service %s/%s has no endpoints", err, ns, controller.PDMemberName(tcName))
}
return err
}
tc.Status.ClusterID = strconv.FormatUint(cluster.Id, 10)

healthInfo, err := pdClient.GetHealth()
cluster, err := pdClient.GetCluster()
if err != nil {
tc.Status.PD.Synced = false
return err
}
tc.Status.ClusterID = strconv.FormatUint(cluster.Id, 10)
leader, err := pdClient.GetPDLeader()
if err != nil {
tc.Status.PD.Synced = false
Expand Down
18 changes: 17 additions & 1 deletion pkg/manager/member/pd_member_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,19 +78,27 @@ func TestPDMemberManagerSyncCreate(t *testing.T) {
g.Expect(tc.Spec).To(Equal(oldSpec))

svc1, err := pmm.svcLister.Services(ns).Get(controller.PDMemberName(tcName))
eps1, eperr := pmm.epsLister.Endpoints(ns).Get(controller.PDMemberName(tcName))
if test.pdSvcCreated {
g.Expect(err).NotTo(HaveOccurred())
g.Expect(svc1).NotTo(Equal(nil))
g.Expect(eperr).NotTo(HaveOccurred())
g.Expect(eps1).NotTo(Equal(nil))
} else {
expectErrIsNotFound(g, err)
expectErrIsNotFound(g, eperr)
}

svc2, err := pmm.svcLister.Services(ns).Get(controller.PDPeerMemberName(tcName))
eps2, eperr := pmm.epsLister.Endpoints(ns).Get(controller.PDPeerMemberName(tcName))
if test.pdPeerSvcCreated {
g.Expect(err).NotTo(HaveOccurred())
g.Expect(svc2).NotTo(Equal(nil))
g.Expect(eperr).NotTo(HaveOccurred())
g.Expect(eps2).NotTo(Equal(nil))
} else {
expectErrIsNotFound(g, err)
expectErrIsNotFound(g, eperr)
}

tc1, err := pmm.setLister.StatefulSets(ns).Get(controller.PDMemberName(tcName))
Expand Down Expand Up @@ -243,8 +251,14 @@ func TestPDMemberManagerSyncUpdate(t *testing.T) {

_, err = pmm.svcLister.Services(ns).Get(controller.PDMemberName(tcName))
g.Expect(err).NotTo(HaveOccurred())
_, err = pmm.epsLister.Endpoints(ns).Get(controller.PDMemberName(tcName))
g.Expect(err).NotTo(HaveOccurred())

_, err = pmm.svcLister.Services(ns).Get(controller.PDPeerMemberName(tcName))
g.Expect(err).NotTo(HaveOccurred())
_, err = pmm.epsLister.Endpoints(ns).Get(controller.PDPeerMemberName(tcName))
g.Expect(err).NotTo(HaveOccurred())

_, err = pmm.setLister.StatefulSets(ns).Get(controller.PDMemberName(tcName))
g.Expect(err).NotTo(HaveOccurred())

Expand Down Expand Up @@ -634,10 +648,11 @@ func newFakePDMemberManager() (*pdMemberManager, *controller.FakeStatefulSetCont
setInformer := kubeinformers.NewSharedInformerFactory(kubeCli, 0).Apps().V1beta1().StatefulSets()
svcInformer := kubeinformers.NewSharedInformerFactory(kubeCli, 0).Core().V1().Services()
podInformer := kubeinformers.NewSharedInformerFactory(kubeCli, 0).Core().V1().Pods()
epsInformer := kubeinformers.NewSharedInformerFactory(kubeCli, 0).Core().V1().Endpoints()
pvcInformer := kubeinformers.NewSharedInformerFactory(kubeCli, 0).Core().V1().PersistentVolumeClaims()
tcInformer := informers.NewSharedInformerFactory(cli, 0).Pingcap().V1alpha1().TidbClusters()
setControl := controller.NewFakeStatefulSetControl(setInformer, tcInformer)
svcControl := controller.NewFakeServiceControl(svcInformer, tcInformer)
svcControl := controller.NewFakeServiceControl(svcInformer, epsInformer, tcInformer)
podControl := controller.NewFakePodControl(podInformer)
pdControl := controller.NewFakePDControl()
pdScaler := NewFakePDScaler()
Expand All @@ -652,6 +667,7 @@ func newFakePDMemberManager() (*pdMemberManager, *controller.FakeStatefulSetCont
setInformer.Lister(),
svcInformer.Lister(),
podInformer.Lister(),
epsInformer.Lister(),
podControl,
pvcInformer.Lister(),
pdScaler,
Expand Down
3 changes: 2 additions & 1 deletion pkg/manager/member/tidb_member_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,9 +527,10 @@ func newFakeTiDBMemberManager() (*tidbMemberManager, *controller.FakeStatefulSet
setInformer := kubeinformers.NewSharedInformerFactory(kubeCli, 0).Apps().V1beta1().StatefulSets()
tcInformer := informers.NewSharedInformerFactory(cli, 0).Pingcap().V1alpha1().TidbClusters()
svcInformer := kubeinformers.NewSharedInformerFactory(kubeCli, 0).Core().V1().Services()
epsInformer := kubeinformers.NewSharedInformerFactory(kubeCli, 0).Core().V1().Endpoints()
podInformer := kubeinformers.NewSharedInformerFactory(kubeCli, 0).Core().V1().Pods()
setControl := controller.NewFakeStatefulSetControl(setInformer, tcInformer)
svcControl := controller.NewFakeServiceControl(svcInformer, tcInformer)
svcControl := controller.NewFakeServiceControl(svcInformer, epsInformer, tcInformer)
tidbUpgrader := NewFakeTiDBUpgrader()
tidbFailover := NewFakeTiDBFailover()
tidbControl := controller.NewFakeTiDBControl()
Expand Down
3 changes: 2 additions & 1 deletion pkg/manager/member/tikv_member_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1411,9 +1411,10 @@ func newFakeTiKVMemberManager(tc *v1alpha1.TidbCluster) (
pdControl.SetPDClient(tc, pdClient)
setInformer := kubeinformers.NewSharedInformerFactory(kubeCli, 0).Apps().V1beta1().StatefulSets()
svcInformer := kubeinformers.NewSharedInformerFactory(kubeCli, 0).Core().V1().Services()
epsInformer := kubeinformers.NewSharedInformerFactory(kubeCli, 0).Core().V1().Endpoints()
tcInformer := informers.NewSharedInformerFactory(cli, 0).Pingcap().V1alpha1().TidbClusters()
setControl := controller.NewFakeStatefulSetControl(setInformer, tcInformer)
svcControl := controller.NewFakeServiceControl(svcInformer, tcInformer)
svcControl := controller.NewFakeServiceControl(svcInformer, epsInformer, tcInformer)
podInformer := kubeinformers.NewSharedInformerFactory(kubeCli, 0).Core().V1().Pods()
nodeInformer := kubeinformers.NewSharedInformerFactory(kubeCli, 0).Core().V1().Nodes()
tikvScaler := NewFakeTiKVScaler()
Expand Down

0 comments on commit 6a727e4

Please sign in to comment.