Skip to content

Commit

Permalink
Use resolver module in CoreDNS plugin
Browse files Browse the repository at this point in the history
Related to submariner-io#214

Signed-off-by: Tom Pantelis <[email protected]>
  • Loading branch information
tpantelis committed Mar 6, 2023
1 parent 99cdf94 commit 9a5063f
Show file tree
Hide file tree
Showing 9 changed files with 363 additions and 465 deletions.
39 changes: 8 additions & 31 deletions coredns/gateway/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,13 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
logf "sigs.k8s.io/controller-runtime/pkg/log"
)

var logger = log.Logger{Logger: logf.Log.WithName("Gateway")}

type NewClientsetFunc func(c *rest.Config) (dynamic.Interface, error)

// NewClientset is an indirection hook for unit tests to supply fake client sets.
var NewClientset NewClientsetFunc

type Controller struct {
NewClientset NewClientsetFunc
informer cache.Controller
store cache.Store
queue workqueue.Interface
Expand All @@ -61,7 +54,6 @@ type Controller struct {

func NewController() *Controller {
controller := &Controller{
NewClientset: getNewClientsetFunc(),
queue: workqueue.New("Gateway Controller"),
stopCh: make(chan struct{}),
gatewayAvailable: true,
Expand All @@ -77,18 +69,8 @@ func NewController() *Controller {
return controller
}

func getNewClientsetFunc() NewClientsetFunc {
if NewClientset != nil {
return NewClientset
}

return func(c *rest.Config) (dynamic.Interface, error) {
return dynamic.NewForConfig(c)
}
}

func (c *Controller) Start(kubeConfig *rest.Config) error {
gwClientset, err := c.getCheckedClientset(kubeConfig)
func (c *Controller) Start(client dynamic.Interface) error {
gwClientset, err := c.getCheckedClientset(client)
if apierrors.IsNotFound(err) {
logger.Infof("Connectivity component is not installed, disabling Gateway status controller")

Expand All @@ -100,7 +82,7 @@ func (c *Controller) Start(kubeConfig *rest.Config) error {
if err != nil {
return err
}

logger.Infof("Starting Gateway status Controller")

//nolint:wrapcheck // Let the caller wrap these errors.
Expand Down Expand Up @@ -211,7 +193,7 @@ func (c *Controller) updateClusterStatusMap(connections []interface{}) {
}

func (c *Controller) updateLocalClusterIDIfNeeded(clusterID string) {
updateNeeded := clusterID != "" && clusterID != c.LocalClusterID()
updateNeeded := clusterID != "" && clusterID != c.GetLocalClusterID()
if updateNeeded {
logger.Infof("Updating the gateway localClusterID %q ", clusterID)
c.localClusterID.Store(clusterID)
Expand Down Expand Up @@ -264,15 +246,10 @@ func (c *Controller) getClusterStatusMap() map[string]bool {
return c.clusterStatusMap.Load().(map[string]bool)
}

func (c *Controller) getCheckedClientset(kubeConfig *rest.Config) (dynamic.ResourceInterface, error) {
clientSet, err := c.NewClientset(kubeConfig)
if err != nil {
return nil, errors.Wrap(err, "error creating client set")
}

func (c *Controller) getCheckedClientset(client dynamic.Interface) (dynamic.ResourceInterface, error) {
// First check if the Submariner resource is present.
gvr, _ := schema.ParseResourceArg("submariners.v1alpha1.submariner.io")
list, err := clientSet.Resource(*gvr).Namespace(v1.NamespaceAll).List(context.TODO(), metav1.ListOptions{})
list, err := client.Resource(*gvr).Namespace(v1.NamespaceAll).List(context.TODO(), metav1.ListOptions{})
if apierrors.IsNotFound(err) || meta.IsNoMatchError(err) || (err == nil && len(list.Items) == 0) {
return nil, apierrors.NewNotFound(gvr.GroupResource(), "")
}
Expand All @@ -282,7 +259,7 @@ func (c *Controller) getCheckedClientset(kubeConfig *rest.Config) (dynamic.Resou
}

gvr, _ = schema.ParseResourceArg("gateways.v1.submariner.io")
gwClient := clientSet.Resource(*gvr).Namespace(v1.NamespaceAll)
gwClient := client.Resource(*gvr).Namespace(v1.NamespaceAll)
_, err = gwClient.List(context.TODO(), metav1.ListOptions{})

if apierrors.IsNotFound(err) || meta.IsNoMatchError(err) {
Expand Down Expand Up @@ -310,6 +287,6 @@ func (c *Controller) IsConnected(clusterID string) bool {
return !c.gatewayAvailable || c.getClusterStatusMap()[clusterID]
}

func (c *Controller) LocalClusterID() string {
func (c *Controller) GetLocalClusterID() string {
return c.localClusterID.Load().(string)
}
8 changes: 2 additions & 6 deletions coredns/gateway/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
fakeClient "k8s.io/client-go/dynamic/fake"
"k8s.io/client-go/rest"
)

const (
Expand Down Expand Up @@ -209,17 +208,14 @@ func newTestDiver() *testDriver {

JustBeforeEach(func() {
t.controller = gateway.NewController()
t.controller.NewClientset = func(c *rest.Config) (dynamic.Interface, error) {
return t.dynClient, nil
}

if t.submarinerObj != nil {
_, err := t.dynClient.Resource(submarinersGVR).Namespace("submariner-operator").Create(context.TODO(), t.submarinerObj,
metav1.CreateOptions{})
Expect(err).To(Succeed())
}

Expect(t.controller.Start(&rest.Config{})).To(Succeed())
Expect(t.controller.Start(t.dynClient)).To(Succeed())
})

AfterEach(func() {
Expand Down Expand Up @@ -259,7 +255,7 @@ func (t *testDriver) localClusterIDUpdateValidationTest(originalLocalClusterID,

func (t *testDriver) awaitValidLocalClusterID(clusterID string) {
Eventually(func() string {
return t.controller.LocalClusterID()
return t.controller.GetLocalClusterID()
}, 5).Should(Equal(clusterID))
}

Expand Down
24 changes: 4 additions & 20 deletions coredns/plugin/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/coredns/coredns/plugin"
"github.com/coredns/coredns/request"
"github.com/miekg/dns"
"github.com/submariner-io/lighthouse/coredns/serviceimport"
)

const PluginName = "lighthouse"
Expand Down Expand Up @@ -70,25 +69,10 @@ func (lh *Lighthouse) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns
func (lh *Lighthouse) getDNSRecord(ctx context.Context, zone string, state *request.Request, w dns.ResponseWriter,
r *dns.Msg, pReq *recordRequest,
) (int, error) {
var isHeadless bool
var (
dnsRecords []serviceimport.DNSRecord
found bool
record *serviceimport.DNSRecord
)

record, found = lh.getClusterIPForSvc(pReq)
dnsRecords, isHeadless, found := lh.Resolver.GetDNSRecords(pReq.namespace, pReq.service, pReq.cluster, pReq.hostname)
if !found {
dnsRecords, found = lh.EndpointSlices.GetDNSRecords(pReq.hostname, pReq.cluster, pReq.namespace,
pReq.service, lh.ClusterStatus.IsConnected)
if !found {
log.Debugf("No record found for %q", state.QName())
return lh.nextOrFailure(ctx, state, r, dns.RcodeNameError)
}

isHeadless = true
} else if record != nil && record.IP != "" {
dnsRecords = append(dnsRecords, *record)
log.Debugf("No record found for %q", state.QName())
return lh.nextOrFailure(ctx, state, r, dns.RcodeNameError)
}

if len(dnsRecords) == 0 {
Expand All @@ -102,7 +86,7 @@ func (lh *Lighthouse) getDNSRecord(ctx context.Context, zone string, state *requ
}

// Count records
localClusterID := lh.ClusterStatus.LocalClusterID()
localClusterID := lh.ClusterStatus.GetLocalClusterID()
for _, record := range dnsRecords {
incDNSQueryCounter(localClusterID, record.ClusterName, pReq.service, pReq.namespace, record.IP)
}
Expand Down
Loading

0 comments on commit 9a5063f

Please sign in to comment.