Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Customizable Consul sync tag (fixes #42) #39

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions catalog/from-consul/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,19 @@ import (
"time"

"github.com/cenkalti/backoff"
fromk8s "github.com/hashicorp/consul-k8s/catalog/from-k8s"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/go-hclog"
)

// Source is the source for the sync that watches Consul services and
// updates a Sink whenever the set of services to register changes.
type Source struct {
Client *api.Client // Consul API client
Domain string // Consul DNS domain
Sink Sink // Sink is the sink to update with services
Prefix string // Prefix is a prefix to prepend to services
Log hclog.Logger // Logger
Client *api.Client // Consul API client
Domain string // Consul DNS domain
Sink Sink // Sink is the sink to update with services
Prefix string // Prefix is a prefix to prepend to services
Log hclog.Logger // Logger
ConsulK8STag string // The tag value for services registered
}

// Run is the long-running runloop for watching Consul services and
Expand Down Expand Up @@ -62,7 +62,7 @@ func (s *Source) Run(ctx context.Context) {
// check here.
k8s := false
for _, t := range tags {
if t == fromk8s.ConsulK8STag {
if t == s.ConsulK8STag {
k8s = true
break
}
Expand Down
11 changes: 6 additions & 5 deletions catalog/from-consul/source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func TestSource_ignoreK8S(t *testing.T) {
require.NoError(err)
_, err = client.Catalog().Register(testRegistration("hostB", "svcA", nil), nil)
require.NoError(err)
_, err = client.Catalog().Register(testRegistration("hostB", "svcB", []string{fromk8s.ConsulK8STag}), nil)
_, err = client.Catalog().Register(testRegistration("hostB", "svcB", []string{fromk8s.TestConsulK8STag}), nil)
require.NoError(err)

_, sink, closer := testSource(t, client)
Expand Down Expand Up @@ -249,10 +249,11 @@ func testRegistration(node, service string, tags []string) *api.CatalogRegistrat
func testSource(t *testing.T, client *api.Client) (*Source, *TestSink, func()) {
sink := &TestSink{}
s := &Source{
Client: client,
Domain: "test",
Sink: sink,
Log: hclog.Default(),
Client: client,
Domain: "test",
Sink: sink,
Log: hclog.Default(),
ConsulK8STag: fromk8s.TestConsulK8STag,
}

ctx, cancelF := context.WithCancel(context.Background())
Expand Down
8 changes: 4 additions & 4 deletions catalog/from-k8s/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,6 @@ const (
// ConsulK8SNS is the key used in the meta to record the namespace
// of the service/node registration.
ConsulK8SNS = "external-k8s-ns"

// ConsulK8STag is the tag value for services registered.
ConsulK8STag = "k8s"
)

// ServiceResource implements controller.Resource to sync Service resource
Expand All @@ -38,6 +35,9 @@ type ServiceResource struct {
Syncer Syncer
Namespace string // K8S namespace to watch

// ConsulK8STag is the tag value for services registered.
ConsulK8STag string

// ExplictEnable should be set to true to require explicit enabling
// using annotations. If this is false, then services are implicitly
// enabled (aka default enabled).
Expand Down Expand Up @@ -240,7 +240,7 @@ func (t *ServiceResource) generateRegistrations(key string) {

baseService := consulapi.AgentService{
Service: svc.Name,
Tags: []string{ConsulK8STag},
Tags: []string{t.ConsulK8STag},
Meta: map[string]string{
ConsulSourceKey: ConsulSourceValue,
ConsulK8SNS: t.namespace(),
Expand Down
7 changes: 4 additions & 3 deletions catalog/from-k8s/resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,9 +463,10 @@ func TestServiceResource_lbAnnotatedTags(t *testing.T) {

// Start the controller
closer := controller.TestControllerRun(&ServiceResource{
Log: hclog.Default(),
Client: client,
Syncer: syncer,
Log: hclog.Default(),
Client: client,
Syncer: syncer,
ConsulK8STag: TestConsulK8STag,
})
defer closer()

Expand Down
9 changes: 6 additions & 3 deletions catalog/from-k8s/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ type ConsulSyncer struct {
SyncPeriod time.Duration
ServicePollPeriod time.Duration

// ConsulK8STag is the tag value for services registered.
ConsulK8STag string

lock sync.Mutex
once sync.Once
services map[string]struct{} // set of valid service names
Expand Down Expand Up @@ -175,7 +178,7 @@ func (s *ConsulSyncer) watchReapableServices(ctx context.Context) {
// Go through the service map and find services that should be reaped
for name, tags := range serviceMap {
for _, tag := range tags {
if tag == ConsulK8STag {
if tag == s.ConsulK8STag {
// We only care if we don't know about this service at all.
if _, ok := s.services[name]; ok {
continue
Expand Down Expand Up @@ -220,7 +223,7 @@ func (s *ConsulSyncer) watchService(ctx context.Context, name string) {
var services []*api.CatalogService
err := backoff.Retry(func() error {
var err error
services, _, err = s.Client.Catalog().Service(name, ConsulK8STag, &api.QueryOptions{
services, _, err = s.Client.Catalog().Service(name, s.ConsulK8STag, &api.QueryOptions{
AllowStale: true,
})
return err
Expand Down Expand Up @@ -269,7 +272,7 @@ func (s *ConsulSyncer) watchService(ctx context.Context, name string) {
//
// Precondition: lock must be held
func (s *ConsulSyncer) scheduleReapServiceLocked(name string) error {
services, _, err := s.Client.Catalog().Service(name, ConsulK8STag, &api.QueryOptions{
services, _, err := s.Client.Catalog().Service(name, s.ConsulK8STag, &api.QueryOptions{
AllowStale: true,
})
if err != nil {
Expand Down
7 changes: 4 additions & 3 deletions catalog/from-k8s/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,14 +247,14 @@ func testRegistration(node, service string) *api.CatalogRegistration {
return &api.CatalogRegistration{
Node: node,
Address: "127.0.0.1",
NodeMeta: map[string]string{ConsulSourceKey: ConsulK8STag},
NodeMeta: map[string]string{ConsulSourceKey: TestConsulK8STag},
SkipNodeUpdate: true,
Service: &api.AgentService{
ID: serviceID(node, service),
Service: service,
Tags: []string{ConsulK8STag},
Tags: []string{TestConsulK8STag},
Meta: map[string]string{
ConsulSourceKey: ConsulK8STag,
ConsulSourceKey: TestConsulK8STag,
ConsulK8SNS: "default",
},
},
Expand All @@ -268,6 +268,7 @@ func testConsulSyncer(t *testing.T, client *api.Client) (*ConsulSyncer, func())
SyncPeriod: 200 * time.Millisecond,
ServicePollPeriod: 50 * time.Millisecond,
Namespace: "default",
ConsulK8STag: TestConsulK8STag,
}

ctx, cancelF := context.WithCancel(context.Background())
Expand Down
4 changes: 4 additions & 0 deletions catalog/from-k8s/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ import (
"github.com/hashicorp/consul/api"
)

const (
TestConsulK8STag = "k8s"
)

// TestSyncer implements Syncer for tests, giving easy access to the
// set of registrations.
type TestSyncer struct {
Expand Down
16 changes: 11 additions & 5 deletions subcommand/sync-catalog/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type Command struct {
flagToConsul bool
flagToK8S bool
flagConsulDomain string
flagConsulK8STag string
flagK8SDefault bool
flagK8SServicePrefix string
flagK8SSourceNamespace string
Expand Down Expand Up @@ -66,6 +67,8 @@ func (c *Command) init() {
c.flags.StringVar(&c.flagConsulDomain, "consul-domain", "consul",
"The domain for Consul services to use when writing services to "+
"Kubernetes. Defaults to consul.")
c.flags.StringVar(&c.flagConsulK8STag, "consul-k8s-tag", "k8s",
"Tag value for K8S services registered in Consul")
c.flags.Var(&c.flagConsulWritePeriod, "consul-write-interval",
"The interval to perform syncing operations creating Consul services. "+
"All changes are merged and write calls are only made on this "+
Expand Down Expand Up @@ -129,6 +132,7 @@ func (c *Command) Run(args []string) int {
Namespace: c.flagK8SSourceNamespace,
SyncPeriod: syncInterval,
ServicePollPeriod: syncInterval * 2,
ConsulK8STag: c.flagConsulK8STag,
}
go syncer.Run(ctx)

Expand All @@ -142,6 +146,7 @@ func (c *Command) Run(args []string) int {
Namespace: c.flagK8SSourceNamespace,
ExplicitEnable: !c.flagK8SDefault,
ClusterIPSync: c.flagSyncClusterIPServices,
ConsulK8STag: c.flagConsulK8STag,
},
}

Expand All @@ -162,11 +167,12 @@ func (c *Command) Run(args []string) int {
}

source := &catalogFromConsul.Source{
Client: consulClient,
Domain: c.flagConsulDomain,
Sink: sink,
Prefix: c.flagK8SServicePrefix,
Log: hclog.Default().Named("to-k8s/source"),
Client: consulClient,
Domain: c.flagConsulDomain,
Sink: sink,
Prefix: c.flagK8SServicePrefix,
Log: hclog.Default().Named("to-k8s/source"),
ConsulK8STag: c.flagConsulK8STag,
}
go source.Run(ctx)

Expand Down