Skip to content

Commit

Permalink
Customizable Consul sync tag
Browse files Browse the repository at this point in the history
  • Loading branch information
r2bit committed Nov 29, 2018
1 parent 90af0ee commit b33e67a
Show file tree
Hide file tree
Showing 8 changed files with 46 additions and 30 deletions.
14 changes: 7 additions & 7 deletions catalog/from-consul/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,19 @@ import (
"time"

"github.com/cenkalti/backoff"
fromk8s "github.com/hashicorp/consul-k8s/catalog/from-k8s"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/go-hclog"
)

// Source is the source for the sync that watches Consul services and
// updates a Sink whenever the set of services to register changes.
type Source struct {
Client *api.Client // Consul API client
Domain string // Consul DNS domain
Sink Sink // Sink is the sink to update with services
Prefix string // Prefix is a prefix to prepend to services
Log hclog.Logger // Logger
Client *api.Client // Consul API client
Domain string // Consul DNS domain
Sink Sink // Sink is the sink to update with services
Prefix string // Prefix is a prefix to prepend to services
Log hclog.Logger // Logger
ConsulK8STag string // The tag value for services registered
}

// Run is the long-running runloop for watching Consul services and
Expand Down Expand Up @@ -62,7 +62,7 @@ func (s *Source) Run(ctx context.Context) {
// check here.
k8s := false
for _, t := range tags {
if t == fromk8s.ConsulK8STag {
if t == s.ConsulK8STag {
k8s = true
break
}
Expand Down
11 changes: 6 additions & 5 deletions catalog/from-consul/source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func TestSource_ignoreK8S(t *testing.T) {
require.NoError(err)
_, err = client.Catalog().Register(testRegistration("hostB", "svcA", nil), nil)
require.NoError(err)
_, err = client.Catalog().Register(testRegistration("hostB", "svcB", []string{fromk8s.ConsulK8STag}), nil)
_, err = client.Catalog().Register(testRegistration("hostB", "svcB", []string{fromk8s.TestConsulK8STag}), nil)
require.NoError(err)

_, sink, closer := testSource(t, client)
Expand Down Expand Up @@ -249,10 +249,11 @@ func testRegistration(node, service string, tags []string) *api.CatalogRegistrat
func testSource(t *testing.T, client *api.Client) (*Source, *TestSink, func()) {
sink := &TestSink{}
s := &Source{
Client: client,
Domain: "test",
Sink: sink,
Log: hclog.Default(),
Client: client,
Domain: "test",
Sink: sink,
Log: hclog.Default(),
ConsulK8STag: fromk8s.TestConsulK8STag,
}

ctx, cancelF := context.WithCancel(context.Background())
Expand Down
8 changes: 4 additions & 4 deletions catalog/from-k8s/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,6 @@ const (
// ConsulK8SNS is the key used in the meta to record the namespace
// of the service/node registration.
ConsulK8SNS = "external-k8s-ns"

// ConsulK8STag is the tag value for services registered.
ConsulK8STag = "k8s"
)

// ServiceResource implements controller.Resource to sync Service resource
Expand All @@ -38,6 +35,9 @@ type ServiceResource struct {
Syncer Syncer
Namespace string // K8S namespace to watch

// ConsulK8STag is the tag value for services registered.
ConsulK8STag string

// ExplictEnable should be set to true to require explicit enabling
// using annotations. If this is false, then services are implicitly
// enabled (aka default enabled).
Expand Down Expand Up @@ -231,7 +231,7 @@ func (t *ServiceResource) generateRegistrations(key string) {

baseService := consulapi.AgentService{
Service: svc.Name,
Tags: []string{ConsulK8STag},
Tags: []string{t.ConsulK8STag},
Meta: map[string]string{
ConsulSourceKey: ConsulSourceValue,
ConsulK8SNS: t.namespace(),
Expand Down
7 changes: 4 additions & 3 deletions catalog/from-k8s/resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,9 +463,10 @@ func TestServiceResource_lbAnnotatedTags(t *testing.T) {

// Start the controller
closer := controller.TestControllerRun(&ServiceResource{
Log: hclog.Default(),
Client: client,
Syncer: syncer,
Log: hclog.Default(),
Client: client,
Syncer: syncer,
ConsulK8STag: TestConsulK8STag,
})
defer closer()

Expand Down
9 changes: 6 additions & 3 deletions catalog/from-k8s/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ type ConsulSyncer struct {
SyncPeriod time.Duration
ServicePollPeriod time.Duration

// ConsulK8STag is the tag value for services registered.
ConsulK8STag string

lock sync.Mutex
once sync.Once
services map[string]struct{} // set of valid service names
Expand Down Expand Up @@ -175,7 +178,7 @@ func (s *ConsulSyncer) watchReapableServices(ctx context.Context) {
// Go through the service map and find services that should be reaped
for name, tags := range serviceMap {
for _, tag := range tags {
if tag == ConsulK8STag {
if tag == s.ConsulK8STag {
// We only care if we don't know about this service at all.
if _, ok := s.services[name]; ok {
continue
Expand Down Expand Up @@ -220,7 +223,7 @@ func (s *ConsulSyncer) watchService(ctx context.Context, name string) {
var services []*api.CatalogService
err := backoff.Retry(func() error {
var err error
services, _, err = s.Client.Catalog().Service(name, ConsulK8STag, &api.QueryOptions{
services, _, err = s.Client.Catalog().Service(name, s.ConsulK8STag, &api.QueryOptions{
AllowStale: true,
})
return err
Expand Down Expand Up @@ -269,7 +272,7 @@ func (s *ConsulSyncer) watchService(ctx context.Context, name string) {
//
// Precondition: lock must be held
func (s *ConsulSyncer) scheduleReapServiceLocked(name string) error {
services, _, err := s.Client.Catalog().Service(name, ConsulK8STag, &api.QueryOptions{
services, _, err := s.Client.Catalog().Service(name, s.ConsulK8STag, &api.QueryOptions{
AllowStale: true,
})
if err != nil {
Expand Down
7 changes: 4 additions & 3 deletions catalog/from-k8s/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,14 +247,14 @@ func testRegistration(node, service string) *api.CatalogRegistration {
return &api.CatalogRegistration{
Node: node,
Address: "127.0.0.1",
NodeMeta: map[string]string{ConsulSourceKey: ConsulK8STag},
NodeMeta: map[string]string{ConsulSourceKey: TestConsulK8STag},
SkipNodeUpdate: true,
Service: &api.AgentService{
ID: serviceID(node, service),
Service: service,
Tags: []string{ConsulK8STag},
Tags: []string{TestConsulK8STag},
Meta: map[string]string{
ConsulSourceKey: ConsulK8STag,
ConsulSourceKey: TestConsulK8STag,
ConsulK8SNS: "default",
},
},
Expand All @@ -268,6 +268,7 @@ func testConsulSyncer(t *testing.T, client *api.Client) (*ConsulSyncer, func())
SyncPeriod: 200 * time.Millisecond,
ServicePollPeriod: 50 * time.Millisecond,
Namespace: "default",
ConsulK8STag: TestConsulK8STag,
}

ctx, cancelF := context.WithCancel(context.Background())
Expand Down
4 changes: 4 additions & 0 deletions catalog/from-k8s/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ import (
"github.com/hashicorp/consul/api"
)

const (
TestConsulK8STag = "k8s"
)

// TestSyncer implements Syncer for tests, giving easy access to the
// set of registrations.
type TestSyncer struct {
Expand Down
16 changes: 11 additions & 5 deletions subcommand/sync-catalog/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type Command struct {
flagToConsul bool
flagToK8S bool
flagConsulDomain string
flagConsulK8STag string
flagK8SDefault bool
flagK8SServicePrefix string
flagK8SSourceNamespace string
Expand Down Expand Up @@ -65,6 +66,8 @@ func (c *Command) init() {
c.flags.StringVar(&c.flagConsulDomain, "consul-domain", "consul",
"The domain for Consul services to use when writing services to "+
"Kubernetes. Defaults to consul.")
c.flags.StringVar(&c.flagConsulK8STag, "consul-k8s-tag", "k8s",
"Tag value for K8S services registered in Consul")
c.flags.Var(&c.flagConsulWritePeriod, "consul-write-interval",
"The interval to perform syncing operations creating Consul services. "+
"All changes are merged and write calls are only made on this "+
Expand Down Expand Up @@ -125,6 +128,7 @@ func (c *Command) Run(args []string) int {
Namespace: c.flagK8SSourceNamespace,
SyncPeriod: syncInterval,
ServicePollPeriod: syncInterval * 2,
ConsulK8STag: c.flagConsulK8STag,
}
go syncer.Run(ctx)

Expand All @@ -137,6 +141,7 @@ func (c *Command) Run(args []string) int {
Syncer: syncer,
Namespace: c.flagK8SSourceNamespace,
ExplicitEnable: !c.flagK8SDefault,
ConsulK8STag: c.flagConsulK8STag,
},
}

Expand All @@ -157,11 +162,12 @@ func (c *Command) Run(args []string) int {
}

source := &catalogFromConsul.Source{
Client: consulClient,
Domain: c.flagConsulDomain,
Sink: sink,
Prefix: c.flagK8SServicePrefix,
Log: hclog.Default().Named("to-k8s/source"),
Client: consulClient,
Domain: c.flagConsulDomain,
Sink: sink,
Prefix: c.flagK8SServicePrefix,
Log: hclog.Default().Named("to-k8s/source"),
ConsulK8STag: c.flagConsulK8STag,
}
go source.Run(ctx)

Expand Down

0 comments on commit b33e67a

Please sign in to comment.