diff --git a/pkg/controller/service_control.go b/pkg/controller/service_control.go index f9aed4e8371..6a6094d1bb4 100644 --- a/pkg/controller/service_control.go +++ b/pkg/controller/service_control.go @@ -23,6 +23,7 @@ import ( v1listers "github.com/pingcap/tidb-operator/pkg/client/listers/pingcap.com/v1alpha1" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" utilruntime "k8s.io/apimachinery/pkg/util/runtime" coreinformers "k8s.io/client-go/informers/core/v1" "k8s.io/client-go/kubernetes" @@ -122,6 +123,7 @@ var _ ServiceControlInterface = &realServiceControl{} type FakeServiceControl struct { SvcLister corelisters.ServiceLister SvcIndexer cache.Indexer + EpsIndexer cache.Indexer TcLister v1listers.TidbClusterLister TcIndexer cache.Indexer createServiceTracker requestTracker @@ -130,10 +132,11 @@ type FakeServiceControl struct { } // NewFakeServiceControl returns a FakeServiceControl -func NewFakeServiceControl(svcInformer coreinformers.ServiceInformer, tcInformer tcinformers.TidbClusterInformer) *FakeServiceControl { +func NewFakeServiceControl(svcInformer coreinformers.ServiceInformer, epsInformer coreinformers.EndpointsInformer, tcInformer tcinformers.TidbClusterInformer) *FakeServiceControl { return &FakeServiceControl{ svcInformer.Lister(), svcInformer.Informer().GetIndexer(), + epsInformer.Informer().GetIndexer(), tcInformer.Lister(), tcInformer.Informer().GetIndexer(), requestTracker{0, nil, 0}, @@ -168,7 +171,21 @@ func (ssc *FakeServiceControl) CreateService(_ *v1alpha1.TidbCluster, svc *corev return ssc.createServiceTracker.err } - return ssc.SvcIndexer.Add(svc) + err := ssc.SvcIndexer.Add(svc) + if err != nil { + return err + } + // add a new endpoint to indexer if svc has selector + if svc.Spec.Selector != nil { + eps := &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: svc.Name, + Namespace: svc.Namespace, + }, + } + return ssc.EpsIndexer.Add(eps) + } + return nil } // UpdateService updates the service of SvcIndexer @@ -179,6 +196,18 @@ func (ssc *FakeServiceControl) UpdateService(_ *v1alpha1.TidbCluster, svc *corev return nil, ssc.updateServiceTracker.err } + if svc.Spec.Selector != nil { + eps := &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: svc.Name, + Namespace: svc.Namespace, + }, + } + err := ssc.EpsIndexer.Update(eps) + if err != nil { + return nil, err + } + } return svc, ssc.SvcIndexer.Update(svc) } diff --git a/pkg/controller/tidbcluster/tidb_cluster_controller.go b/pkg/controller/tidbcluster/tidb_cluster_controller.go index 38eb2f77b13..5b3acb582f2 100644 --- a/pkg/controller/tidbcluster/tidb_cluster_controller.go +++ b/pkg/controller/tidbcluster/tidb_cluster_controller.go @@ -85,6 +85,7 @@ func NewController( tcInformer := informerFactory.Pingcap().V1alpha1().TidbClusters() setInformer := kubeInformerFactory.Apps().V1beta1().StatefulSets() svcInformer := kubeInformerFactory.Core().V1().Services() + epsInformer := kubeInformerFactory.Core().V1().Endpoints() pvcInformer := kubeInformerFactory.Core().V1().PersistentVolumeClaims() pvInformer := kubeInformerFactory.Core().V1().PersistentVolumes() podInformer := kubeInformerFactory.Core().V1().Pods() @@ -119,6 +120,7 @@ func NewController( setInformer.Lister(), svcInformer.Lister(), podInformer.Lister(), + epsInformer.Lister(), podControl, pvcInformer.Lister(), pdScaler, diff --git a/pkg/controller/tidbcluster/tidb_cluster_controller_test.go b/pkg/controller/tidbcluster/tidb_cluster_controller_test.go index f5745f9da0b..55a93ed3f3f 100644 --- a/pkg/controller/tidbcluster/tidb_cluster_controller_test.go +++ b/pkg/controller/tidbcluster/tidb_cluster_controller_test.go @@ -223,6 +223,7 @@ func newFakeTidbClusterController() (*Controller, cache.Indexer, cache.Indexer) tcInformer := informerFactory.Pingcap().V1alpha1().TidbClusters() podInformer := kubeInformerFactory.Core().V1().Pods() nodeInformer := kubeInformerFactory.Core().V1().Nodes() + epsInformer := kubeInformerFactory.Core().V1().Endpoints() autoFailover := true tcc := NewController( @@ -272,6 +273,7 @@ func newFakeTidbClusterController() (*Controller, cache.Indexer, cache.Indexer) setInformer.Lister(), svcInformer.Lister(), podInformer.Lister(), + epsInformer.Lister(), podControl, pvcInformer.Lister(), pdScaler, diff --git a/pkg/manager/member/pd_member_manager.go b/pkg/manager/member/pd_member_manager.go index d975ebec8de..0b74f4f56cf 100644 --- a/pkg/manager/member/pd_member_manager.go +++ b/pkg/manager/member/pd_member_manager.go @@ -40,6 +40,7 @@ type pdMemberManager struct { setLister v1beta1.StatefulSetLister svcLister corelisters.ServiceLister podLister corelisters.PodLister + epsLister corelisters.EndpointsLister podControl controller.PodControlInterface pvcLister corelisters.PersistentVolumeClaimLister pdScaler Scaler @@ -55,6 +56,7 @@ func NewPDMemberManager(pdControl controller.PDControlInterface, setLister v1beta1.StatefulSetLister, svcLister corelisters.ServiceLister, podLister corelisters.PodLister, + epsLister corelisters.EndpointsLister, podControl controller.PodControlInterface, pvcLister corelisters.PersistentVolumeClaimLister, pdScaler Scaler, @@ -68,6 +70,7 @@ func NewPDMemberManager(pdControl controller.PDControlInterface, setLister, svcLister, podLister, + epsLister, podControl, pvcLister, pdScaler, @@ -258,18 +261,27 @@ func (pmm *pdMemberManager) syncTidbClusterStatus(tc *v1alpha1.TidbCluster, set pdClient := pmm.pdControl.GetPDClient(tc) - cluster, err := pdClient.GetCluster() + healthInfo, err := pdClient.GetHealth() if err != nil { tc.Status.PD.Synced = false + // get endpoints info + eps, epErr := pmm.epsLister.Endpoints(ns).Get(controller.PDMemberName(tcName)) + if epErr != nil { + return fmt.Errorf("%s, %s", err, epErr) + } + // pd service has no endpoints + if eps != nil && len(eps.Subsets) == 0 { + return fmt.Errorf("%s, service %s/%s has no endpoints", err, ns, controller.PDMemberName(tcName)) + } return err } - tc.Status.ClusterID = strconv.FormatUint(cluster.Id, 10) - healthInfo, err := pdClient.GetHealth() + cluster, err := pdClient.GetCluster() if err != nil { tc.Status.PD.Synced = false return err } + tc.Status.ClusterID = strconv.FormatUint(cluster.Id, 10) leader, err := pdClient.GetPDLeader() if err != nil { tc.Status.PD.Synced = false diff --git a/pkg/manager/member/pd_member_manager_test.go b/pkg/manager/member/pd_member_manager_test.go index 2653e13335c..01e36b72035 100644 --- a/pkg/manager/member/pd_member_manager_test.go +++ b/pkg/manager/member/pd_member_manager_test.go @@ -78,19 +78,27 @@ func TestPDMemberManagerSyncCreate(t *testing.T) { g.Expect(tc.Spec).To(Equal(oldSpec)) svc1, err := pmm.svcLister.Services(ns).Get(controller.PDMemberName(tcName)) + eps1, eperr := pmm.epsLister.Endpoints(ns).Get(controller.PDMemberName(tcName)) if test.pdSvcCreated { g.Expect(err).NotTo(HaveOccurred()) g.Expect(svc1).NotTo(Equal(nil)) + g.Expect(eperr).NotTo(HaveOccurred()) + g.Expect(eps1).NotTo(Equal(nil)) } else { expectErrIsNotFound(g, err) + expectErrIsNotFound(g, eperr) } svc2, err := pmm.svcLister.Services(ns).Get(controller.PDPeerMemberName(tcName)) + eps2, eperr := pmm.epsLister.Endpoints(ns).Get(controller.PDPeerMemberName(tcName)) if test.pdPeerSvcCreated { g.Expect(err).NotTo(HaveOccurred()) g.Expect(svc2).NotTo(Equal(nil)) + g.Expect(eperr).NotTo(HaveOccurred()) + g.Expect(eps2).NotTo(Equal(nil)) } else { expectErrIsNotFound(g, err) + expectErrIsNotFound(g, eperr) } tc1, err := pmm.setLister.StatefulSets(ns).Get(controller.PDMemberName(tcName)) @@ -243,8 +251,14 @@ func TestPDMemberManagerSyncUpdate(t *testing.T) { _, err = pmm.svcLister.Services(ns).Get(controller.PDMemberName(tcName)) g.Expect(err).NotTo(HaveOccurred()) + _, err = pmm.epsLister.Endpoints(ns).Get(controller.PDMemberName(tcName)) + g.Expect(err).NotTo(HaveOccurred()) + _, err = pmm.svcLister.Services(ns).Get(controller.PDPeerMemberName(tcName)) g.Expect(err).NotTo(HaveOccurred()) + _, err = pmm.epsLister.Endpoints(ns).Get(controller.PDPeerMemberName(tcName)) + g.Expect(err).NotTo(HaveOccurred()) + _, err = pmm.setLister.StatefulSets(ns).Get(controller.PDMemberName(tcName)) g.Expect(err).NotTo(HaveOccurred()) @@ -634,10 +648,11 @@ func newFakePDMemberManager() (*pdMemberManager, *controller.FakeStatefulSetCont setInformer := kubeinformers.NewSharedInformerFactory(kubeCli, 0).Apps().V1beta1().StatefulSets() svcInformer := kubeinformers.NewSharedInformerFactory(kubeCli, 0).Core().V1().Services() podInformer := kubeinformers.NewSharedInformerFactory(kubeCli, 0).Core().V1().Pods() + epsInformer := kubeinformers.NewSharedInformerFactory(kubeCli, 0).Core().V1().Endpoints() pvcInformer := kubeinformers.NewSharedInformerFactory(kubeCli, 0).Core().V1().PersistentVolumeClaims() tcInformer := informers.NewSharedInformerFactory(cli, 0).Pingcap().V1alpha1().TidbClusters() setControl := controller.NewFakeStatefulSetControl(setInformer, tcInformer) - svcControl := controller.NewFakeServiceControl(svcInformer, tcInformer) + svcControl := controller.NewFakeServiceControl(svcInformer, epsInformer, tcInformer) podControl := controller.NewFakePodControl(podInformer) pdControl := controller.NewFakePDControl() pdScaler := NewFakePDScaler() @@ -652,6 +667,7 @@ func newFakePDMemberManager() (*pdMemberManager, *controller.FakeStatefulSetCont setInformer.Lister(), svcInformer.Lister(), podInformer.Lister(), + epsInformer.Lister(), podControl, pvcInformer.Lister(), pdScaler, diff --git a/pkg/manager/member/tidb_member_manager_test.go b/pkg/manager/member/tidb_member_manager_test.go index 611ab7d4759..e91e2ebd45c 100644 --- a/pkg/manager/member/tidb_member_manager_test.go +++ b/pkg/manager/member/tidb_member_manager_test.go @@ -527,9 +527,10 @@ func newFakeTiDBMemberManager() (*tidbMemberManager, *controller.FakeStatefulSet setInformer := kubeinformers.NewSharedInformerFactory(kubeCli, 0).Apps().V1beta1().StatefulSets() tcInformer := informers.NewSharedInformerFactory(cli, 0).Pingcap().V1alpha1().TidbClusters() svcInformer := kubeinformers.NewSharedInformerFactory(kubeCli, 0).Core().V1().Services() + epsInformer := kubeinformers.NewSharedInformerFactory(kubeCli, 0).Core().V1().Endpoints() podInformer := kubeinformers.NewSharedInformerFactory(kubeCli, 0).Core().V1().Pods() setControl := controller.NewFakeStatefulSetControl(setInformer, tcInformer) - svcControl := controller.NewFakeServiceControl(svcInformer, tcInformer) + svcControl := controller.NewFakeServiceControl(svcInformer, epsInformer, tcInformer) tidbUpgrader := NewFakeTiDBUpgrader() tidbFailover := NewFakeTiDBFailover() tidbControl := controller.NewFakeTiDBControl() diff --git a/pkg/manager/member/tikv_member_manager_test.go b/pkg/manager/member/tikv_member_manager_test.go index 981174f6ee6..2d29f7051c7 100644 --- a/pkg/manager/member/tikv_member_manager_test.go +++ b/pkg/manager/member/tikv_member_manager_test.go @@ -1411,9 +1411,10 @@ func newFakeTiKVMemberManager(tc *v1alpha1.TidbCluster) ( pdControl.SetPDClient(tc, pdClient) setInformer := kubeinformers.NewSharedInformerFactory(kubeCli, 0).Apps().V1beta1().StatefulSets() svcInformer := kubeinformers.NewSharedInformerFactory(kubeCli, 0).Core().V1().Services() + epsInformer := kubeinformers.NewSharedInformerFactory(kubeCli, 0).Core().V1().Endpoints() tcInformer := informers.NewSharedInformerFactory(cli, 0).Pingcap().V1alpha1().TidbClusters() setControl := controller.NewFakeStatefulSetControl(setInformer, tcInformer) - svcControl := controller.NewFakeServiceControl(svcInformer, tcInformer) + svcControl := controller.NewFakeServiceControl(svcInformer, epsInformer, tcInformer) podInformer := kubeinformers.NewSharedInformerFactory(kubeCli, 0).Core().V1().Pods() nodeInformer := kubeinformers.NewSharedInformerFactory(kubeCli, 0).Core().V1().Nodes() tikvScaler := NewFakeTiKVScaler()