diff --git a/CHANGELOG.md b/CHANGELOG.md index d7639bd8ef..9b11b66ea1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ ## UNRELEASED +IMPROVEMENTS: + +* Add an ability to configure the synthetic Consul node name where catalog sync registers services. [[GH-312](https://github.com/hashicorp/consul-k8s/pull/312)] + * Sync: Add `-consul-node-name` flag to the `sync-catalog` command to configure the Consul node name for syncing services to Consul. + * ACLs: Add `-sync-consul-node-name` flag to the server-acl-init command so that it can create correct policy for the sync catalog. + ## 0.18.1 (August 10, 2020) BUG FIXES: diff --git a/catalog/to-consul/resource.go b/catalog/to-consul/resource.go index b70c6a4a44..ab35f1b23e 100644 --- a/catalog/to-consul/resource.go +++ b/catalog/to-consul/resource.go @@ -115,6 +115,9 @@ type ServiceResource struct { // `k8s-default` namespace. K8SNSMirroringPrefix string + // The Consul node name to register service with. + ConsulNodeName string + // serviceLock must be held for any read/write to these maps. serviceLock sync.RWMutex @@ -332,7 +335,7 @@ func (t *ServiceResource) generateRegistrations(key string) { // shallow copied for each instance. baseNode := consulapi.CatalogRegistration{ SkipNodeUpdate: true, - Node: ConsulSyncNodeName, + Node: t.ConsulNodeName, Address: "127.0.0.1", NodeMeta: map[string]string{ ConsulSourceKey: ConsulSourceValue, diff --git a/catalog/to-consul/resource_test.go b/catalog/to-consul/resource_test.go index c0c05bb333..6a815bbc1d 100644 --- a/catalog/to-consul/resource_test.go +++ b/catalog/to-consul/resource_test.go @@ -27,7 +27,7 @@ func init() { func TestServiceResource_createDelete(t *testing.T) { t.Parallel() client := fake.NewSimpleClientset() - syncer := &TestSyncer{} + syncer := newTestSyncer() serviceResource := defaultServiceResource(client, syncer) // Start the controller @@ -55,7 +55,7 @@ func TestServiceResource_createDelete(t *testing.T) { func TestServiceResource_defaultEnable(t *testing.T) { t.Parallel() client := fake.NewSimpleClientset() - syncer := &TestSyncer{} + syncer := newTestSyncer() serviceResource := defaultServiceResource(client, syncer) // Start the controller @@ -80,7 +80,7 @@ func TestServiceResource_defaultEnable(t *testing.T) { func TestServiceResource_defaultEnableDisable(t *testing.T) { t.Parallel() client := fake.NewSimpleClientset() - syncer := &TestSyncer{} + syncer := newTestSyncer() serviceResource := defaultServiceResource(client, syncer) // Start the controller @@ -106,7 +106,7 @@ func TestServiceResource_defaultEnableDisable(t *testing.T) { func TestServiceResource_defaultDisable(t *testing.T) { t.Parallel() client := fake.NewSimpleClientset() - syncer := &TestSyncer{} + syncer := newTestSyncer() serviceResource := defaultServiceResource(client, syncer) serviceResource.ExplicitEnable = true @@ -132,7 +132,7 @@ func TestServiceResource_defaultDisable(t *testing.T) { func TestServiceResource_defaultDisableEnable(t *testing.T) { t.Parallel() client := fake.NewSimpleClientset() - syncer := &TestSyncer{} + syncer := newTestSyncer() serviceResource := defaultServiceResource(client, syncer) serviceResource.ExplicitEnable = true @@ -159,7 +159,7 @@ func TestServiceResource_defaultDisableEnable(t *testing.T) { func TestServiceResource_changeSyncToFalse(t *testing.T) { t.Parallel() client := fake.NewSimpleClientset() - syncer := &TestSyncer{} + syncer := newTestSyncer() serviceResource := defaultServiceResource(client, syncer) serviceResource.ExplicitEnable = true @@ -200,7 +200,7 @@ func TestServiceResource_changeSyncToFalse(t *testing.T) { func TestServiceResource_addK8SNamespace(t *testing.T) { t.Parallel() client := fake.NewSimpleClientset() - syncer := &TestSyncer{} + syncer := newTestSyncer() serviceResource := defaultServiceResource(client, syncer) serviceResource.AddK8SNamespaceSuffix = true @@ -228,7 +228,7 @@ func TestServiceResource_addK8SNamespace(t *testing.T) { func TestServiceResource_addK8SNamespaceWithPrefix(t *testing.T) { t.Parallel() client := fake.NewSimpleClientset() - syncer := &TestSyncer{} + syncer := newTestSyncer() serviceResource := defaultServiceResource(client, syncer) serviceResource.AddK8SNamespaceSuffix = true serviceResource.ConsulServicePrefix = "prefix" @@ -252,12 +252,40 @@ func TestServiceResource_addK8SNamespaceWithPrefix(t *testing.T) { }) } +// Test that when consul node name is set to a non-default value, +// services are synced to that node. +func TestServiceResource_ConsulNodeName(t *testing.T) { + t.Parallel() + client := fake.NewSimpleClientset() + syncer := newTestSyncer() + serviceResource := defaultServiceResource(client, syncer) + serviceResource.ConsulNodeName = "test-node" + + // Start the controller + closer := controller.TestControllerRun(&serviceResource) + defer closer() + + // Insert an LB service with the sync=true + svc := lbService("foo", "namespace", "1.2.3.4") + _, err := client.CoreV1().Services("namespace").Create(context.Background(), svc, metav1.CreateOptions{}) + require.NoError(t, err) + + // Verify that the service name has k8s namespace appended with an '-' + retry.Run(t, func(r *retry.R) { + syncer.Lock() + defer syncer.Unlock() + actual := syncer.Registrations + require.Len(r, actual, 1) + require.Equal(r, actual[0].Node, "test-node") + }) +} + // Test k8s namespace suffix is not appended // when the service name annotation is provided func TestServiceResource_addK8SNamespaceWithNameAnnotation(t *testing.T) { t.Parallel() client := fake.NewSimpleClientset() - syncer := &TestSyncer{} + syncer := newTestSyncer() serviceResource := defaultServiceResource(client, syncer) serviceResource.AddK8SNamespaceSuffix = true @@ -285,7 +313,7 @@ func TestServiceResource_addK8SNamespaceWithNameAnnotation(t *testing.T) { func TestServiceResource_externalIP(t *testing.T) { t.Parallel() client := fake.NewSimpleClientset() - syncer := &TestSyncer{} + syncer := newTestSyncer() serviceResource := defaultServiceResource(client, syncer) // Start the controller @@ -315,7 +343,7 @@ func TestServiceResource_externalIP(t *testing.T) { func TestServiceResource_externalIPPrefix(t *testing.T) { t.Parallel() client := fake.NewSimpleClientset() - syncer := &TestSyncer{} + syncer := newTestSyncer() serviceResource := defaultServiceResource(client, syncer) serviceResource.ConsulServicePrefix = "prefix" @@ -346,7 +374,7 @@ func TestServiceResource_externalIPPrefix(t *testing.T) { func TestServiceResource_lb(t *testing.T) { t.Parallel() client := fake.NewSimpleClientset() - syncer := &TestSyncer{} + syncer := newTestSyncer() serviceResource := defaultServiceResource(client, syncer) // Start the controller @@ -373,7 +401,7 @@ func TestServiceResource_lb(t *testing.T) { func TestServiceResource_lbPrefix(t *testing.T) { t.Parallel() client := fake.NewSimpleClientset() - syncer := &TestSyncer{} + syncer := newTestSyncer() serviceResource := defaultServiceResource(client, syncer) serviceResource.ConsulServicePrefix = "prefix" @@ -402,7 +430,7 @@ func TestServiceResource_lbPrefix(t *testing.T) { func TestServiceResource_lbMultiEndpoint(t *testing.T) { t.Parallel() client := fake.NewSimpleClientset() - syncer := &TestSyncer{} + syncer := newTestSyncer() serviceResource := defaultServiceResource(client, syncer) // Start the controller @@ -436,7 +464,7 @@ func TestServiceResource_lbMultiEndpoint(t *testing.T) { func TestServiceResource_lbAnnotatedName(t *testing.T) { t.Parallel() client := fake.NewSimpleClientset() - syncer := &TestSyncer{} + syncer := newTestSyncer() serviceResource := defaultServiceResource(client, syncer) // Start the controller @@ -463,7 +491,7 @@ func TestServiceResource_lbAnnotatedName(t *testing.T) { func TestServiceResource_lbPort(t *testing.T) { t.Parallel() client := fake.NewSimpleClientset() - syncer := &TestSyncer{} + syncer := newTestSyncer() serviceResource := defaultServiceResource(client, syncer) // Start the controller @@ -495,7 +523,7 @@ func TestServiceResource_lbPort(t *testing.T) { func TestServiceResource_lbAnnotatedPort(t *testing.T) { t.Parallel() client := fake.NewSimpleClientset() - syncer := &TestSyncer{} + syncer := newTestSyncer() serviceResource := defaultServiceResource(client, syncer) // Start the controller @@ -528,7 +556,7 @@ func TestServiceResource_lbAnnotatedPort(t *testing.T) { func TestServiceResource_lbAnnotatedTags(t *testing.T) { t.Parallel() client := fake.NewSimpleClientset() - syncer := &TestSyncer{} + syncer := newTestSyncer() serviceResource := defaultServiceResource(client, syncer) serviceResource.ConsulK8STag = TestConsulK8STag @@ -556,7 +584,7 @@ func TestServiceResource_lbAnnotatedTags(t *testing.T) { func TestServiceResource_lbAnnotatedMeta(t *testing.T) { t.Parallel() client := fake.NewSimpleClientset() - syncer := &TestSyncer{} + syncer := newTestSyncer() serviceResource := defaultServiceResource(client, syncer) // Start the controller @@ -583,7 +611,7 @@ func TestServiceResource_lbAnnotatedMeta(t *testing.T) { func TestServiceResource_lbRegisterEndpoints(t *testing.T) { t.Parallel() client := fake.NewSimpleClientset() - syncer := &TestSyncer{} + syncer := newTestSyncer() serviceResource := defaultServiceResource(client, syncer) serviceResource.LoadBalancerEndpointsSync = true @@ -637,7 +665,7 @@ func TestServiceResource_lbRegisterEndpoints(t *testing.T) { // Test that the proper registrations are generated for a NodePort type. func TestServiceResource_nodePort(t *testing.T) { t.Parallel() - syncer := &TestSyncer{} + syncer := newTestSyncer() client := fake.NewSimpleClientset() serviceResource := defaultServiceResource(client, syncer) serviceResource.NodePortSync = ExternalOnly @@ -677,7 +705,7 @@ func TestServiceResource_nodePort(t *testing.T) { func TestServiceResource_nodePortPrefix(t *testing.T) { t.Parallel() client := fake.NewSimpleClientset() - syncer := &TestSyncer{} + syncer := newTestSyncer() serviceResource := defaultServiceResource(client, syncer) serviceResource.NodePortSync = ExternalOnly serviceResource.ConsulServicePrefix = "prefix" @@ -718,7 +746,7 @@ func TestServiceResource_nodePortPrefix(t *testing.T) { func TestServiceResource_nodePort_singleEndpoint(t *testing.T) { t.Parallel() client := fake.NewSimpleClientset() - syncer := &TestSyncer{} + syncer := newTestSyncer() serviceResource := defaultServiceResource(client, syncer) serviceResource.NodePortSync = ExternalOnly @@ -773,7 +801,7 @@ func TestServiceResource_nodePort_singleEndpoint(t *testing.T) { func TestServiceResource_nodePortAnnotatedPort(t *testing.T) { t.Parallel() client := fake.NewSimpleClientset() - syncer := &TestSyncer{} + syncer := newTestSyncer() serviceResource := defaultServiceResource(client, syncer) serviceResource.NodePortSync = ExternalOnly @@ -813,7 +841,7 @@ func TestServiceResource_nodePortAnnotatedPort(t *testing.T) { func TestServiceResource_nodePortUnnamedPort(t *testing.T) { t.Parallel() client := fake.NewSimpleClientset() - syncer := &TestSyncer{} + syncer := newTestSyncer() serviceResource := defaultServiceResource(client, syncer) serviceResource.NodePortSync = ExternalOnly @@ -858,7 +886,7 @@ func TestServiceResource_nodePortUnnamedPort(t *testing.T) { func TestServiceResource_nodePort_internalOnlySync(t *testing.T) { t.Parallel() client := fake.NewSimpleClientset() - syncer := &TestSyncer{} + syncer := newTestSyncer() serviceResource := defaultServiceResource(client, syncer) serviceResource.NodePortSync = InternalOnly @@ -898,7 +926,7 @@ func TestServiceResource_nodePort_internalOnlySync(t *testing.T) { func TestServiceResource_nodePort_externalFirstSync(t *testing.T) { t.Parallel() client := fake.NewSimpleClientset() - syncer := &TestSyncer{} + syncer := newTestSyncer() serviceResource := defaultServiceResource(client, syncer) serviceResource.NodePortSync = ExternalFirst @@ -945,7 +973,7 @@ func TestServiceResource_nodePort_externalFirstSync(t *testing.T) { func TestServiceResource_clusterIP(t *testing.T) { t.Parallel() client := fake.NewSimpleClientset() - syncer := &TestSyncer{} + syncer := newTestSyncer() serviceResource := defaultServiceResource(client, syncer) serviceResource.ClusterIPSync = true @@ -981,7 +1009,7 @@ func TestServiceResource_clusterIP(t *testing.T) { func TestServiceResource_clusterIPPrefix(t *testing.T) { t.Parallel() client := fake.NewSimpleClientset() - syncer := &TestSyncer{} + syncer := newTestSyncer() serviceResource := defaultServiceResource(client, syncer) serviceResource.ClusterIPSync = true serviceResource.ConsulServicePrefix = "prefix" @@ -1019,7 +1047,7 @@ func TestServiceResource_clusterIPPrefix(t *testing.T) { func TestServiceResource_clusterIPAnnotatedPortName(t *testing.T) { t.Parallel() client := fake.NewSimpleClientset() - syncer := &TestSyncer{} + syncer := newTestSyncer() serviceResource := defaultServiceResource(client, syncer) serviceResource.ClusterIPSync = true @@ -1057,7 +1085,7 @@ func TestServiceResource_clusterIPAnnotatedPortName(t *testing.T) { func TestServiceResource_clusterIPAnnotatedPortNumber(t *testing.T) { t.Parallel() client := fake.NewSimpleClientset() - syncer := &TestSyncer{} + syncer := newTestSyncer() serviceResource := defaultServiceResource(client, syncer) serviceResource.ClusterIPSync = true @@ -1094,7 +1122,7 @@ func TestServiceResource_clusterIPAnnotatedPortNumber(t *testing.T) { func TestServiceResource_clusterIPUnnamedPorts(t *testing.T) { t.Parallel() client := fake.NewSimpleClientset() - syncer := &TestSyncer{} + syncer := newTestSyncer() serviceResource := defaultServiceResource(client, syncer) serviceResource.ClusterIPSync = true @@ -1135,7 +1163,7 @@ func TestServiceResource_clusterIPUnnamedPorts(t *testing.T) { func TestServiceResource_clusterIPSyncDisabled(t *testing.T) { t.Parallel() client := fake.NewSimpleClientset() - syncer := &TestSyncer{} + syncer := newTestSyncer() serviceResource := defaultServiceResource(client, syncer) serviceResource.ClusterIPSync = false @@ -1164,7 +1192,7 @@ func TestServiceResource_clusterIPSyncDisabled(t *testing.T) { func TestServiceResource_clusterIPAllNamespaces(t *testing.T) { t.Parallel() client := fake.NewSimpleClientset() - syncer := &TestSyncer{} + syncer := newTestSyncer() testNamespace := "test_namespace" serviceResource := defaultServiceResource(client, syncer) serviceResource.ClusterIPSync = true @@ -1201,7 +1229,7 @@ func TestServiceResource_clusterIPAllNamespaces(t *testing.T) { func TestServiceResource_clusterIPTargetPortNamed(t *testing.T) { t.Parallel() client := fake.NewSimpleClientset() - syncer := &TestSyncer{} + syncer := newTestSyncer() serviceResource := defaultServiceResource(client, syncer) serviceResource.ClusterIPSync = true @@ -1282,7 +1310,7 @@ func TestServiceResource_AllowDenyNamespaces(t *testing.T) { for name, c := range cases { t.Run(name, func(tt *testing.T) { client := fake.NewSimpleClientset() - syncer := &TestSyncer{} + syncer := newTestSyncer() serviceResource := defaultServiceResource(client, syncer) serviceResource.AllowK8sNamespacesSet = c.AllowList serviceResource.DenyK8sNamespacesSet = c.DenyList @@ -1333,7 +1361,7 @@ func TestServiceResource_singleDestNamespace(t *testing.T) { for _, consulDestNamespace := range consulDestNamespaces { t.Run(consulDestNamespace, func(tt *testing.T) { client := fake.NewSimpleClientset() - syncer := &TestSyncer{} + syncer := newTestSyncer() serviceResource := defaultServiceResource(client, syncer) serviceResource.ConsulDestinationNamespace = consulDestNamespace serviceResource.EnableNamespaces = true @@ -1358,7 +1386,7 @@ func TestServiceResource_singleDestNamespace(t *testing.T) { func TestServiceResource_MirroredNamespace(t *testing.T) { t.Parallel() client := fake.NewSimpleClientset() - syncer := &TestSyncer{} + syncer := newTestSyncer() serviceResource := defaultServiceResource(client, syncer) serviceResource.EnableK8SNSMirroring = true serviceResource.EnableNamespaces = true @@ -1393,7 +1421,7 @@ func TestServiceResource_MirroredNamespace(t *testing.T) { func TestServiceResource_MirroredPrefixNamespace(t *testing.T) { t.Parallel() client := fake.NewSimpleClientset() - syncer := &TestSyncer{} + syncer := newTestSyncer() serviceResource := defaultServiceResource(client, syncer) serviceResource.EnableK8SNSMirroring = true serviceResource.EnableNamespaces = true @@ -1569,5 +1597,6 @@ func defaultServiceResource(client kubernetes.Interface, syncer Syncer) ServiceR Syncer: syncer, AllowK8sNamespacesSet: mapset.NewSet("*"), DenyK8sNamespacesSet: mapset.NewSet(), + ConsulNodeName: ConsulSyncNodeName, } } diff --git a/catalog/to-consul/syncer.go b/catalog/to-consul/syncer.go index d51282cb33..cfb628f76d 100644 --- a/catalog/to-consul/syncer.go +++ b/catalog/to-consul/syncer.go @@ -19,10 +19,6 @@ const ( // ConsulServicePollPeriod is how often a service is checked for // whether it has instances to reap. ConsulServicePollPeriod = 60 * time.Second - - // ConsulSyncNodeName is the name of the node in Consul that we register - // services on. It's not a real node backed by a Consul agent. - ConsulSyncNodeName = "k8s-sync" ) // Syncer is responsible for syncing a set of Consul catalog registrations. @@ -69,6 +65,9 @@ type ConsulSyncer struct { // ConsulK8STag is the tag value for services registered. ConsulK8STag string + // The Consul node name to register services with. + ConsulNodeName string + // ConsulNodeServicesClient is used to list services for a node. We use a // separate client for this API call that handles older version of Consul. ConsulNodeServicesClient ConsulNodeServicesClient @@ -189,7 +188,7 @@ func (s *ConsulSyncer) watchReapableServices(ctx context.Context) { var meta *api.QueryMeta err := backoff.Retry(func() error { var err error - services, meta, err = s.ConsulNodeServicesClient.NodeServices(s.ConsulK8STag, ConsulSyncNodeName, *opts) + services, meta, err = s.ConsulNodeServicesClient.NodeServices(s.ConsulK8STag, s.ConsulNodeName, *opts) return err }, backoff.WithContext(backoff.NewExponentialBackOff(), ctx)) diff --git a/catalog/to-consul/syncer_test.go b/catalog/to-consul/syncer_test.go index b454bb5e9b..f42f6fee46 100644 --- a/catalog/to-consul/syncer_test.go +++ b/catalog/to-consul/syncer_test.go @@ -2,6 +2,7 @@ package catalog import ( "context" + "fmt" "net/http" "net/http/httptest" "testing" @@ -14,6 +15,12 @@ import ( "github.com/stretchr/testify/require" ) +const ( + // ConsulSyncNodeName is the name of the node in Consul that we register + // services on. It's not a real node backed by a Consul agent. + ConsulSyncNodeName = "k8s-sync" +) + func TestConsulSyncer_register(t *testing.T) { t.Parallel() require := require.New(t) @@ -58,61 +65,67 @@ func TestConsulSyncer_register(t *testing.T) { // Test that the syncer reaps individual invalid service instances. func TestConsulSyncer_reapServiceInstance(t *testing.T) { t.Parallel() - require := require.New(t) - // Set up server, client, syncer - a, err := testutil.NewTestServerConfigT(t, nil) - require.NoError(err) - defer a.Stop() + for _, node := range []string{ConsulSyncNodeName, "test-node"} { + name := fmt.Sprintf("consul node name: %s", node) + t.Run(name, func(t *testing.T) { + require := require.New(t) - client, err := api.NewClient(&api.Config{ - Address: a.HTTPAddr, - }) - require.NoError(err) + // Set up server, client, syncer + a, err := testutil.NewTestServerConfigT(t, nil) + require.NoError(err) + defer a.Stop() - s, closer := testConsulSyncer(client) - defer closer() + client, err := api.NewClient(&api.Config{ + Address: a.HTTPAddr, + }) + require.NoError(err) - // Sync - s.Sync([]*api.CatalogRegistration{ - testRegistration(ConsulSyncNodeName, "bar", "default"), - }) + s, closer := testConsulSyncer(client) + defer closer() - // Wait for the first service - retry.Run(t, func(r *retry.R) { - services, _, err := client.Catalog().Service("bar", "", nil) - if err != nil { - r.Fatalf("err: %s", err) - } - if len(services) != 1 { - r.Fatal("service not found or too many") - } - }) + // Sync + s.Sync([]*api.CatalogRegistration{ + testRegistration(node, "bar", "default"), + }) - // Create an invalid service directly in Consul - svc := testRegistration(ConsulSyncNodeName, "bar", "default") - svc.Service.ID = serviceID("k8s-sync", "bar2") - _, err = client.Catalog().Register(svc, nil) - require.NoError(err) + // Wait for the first service + retry.Run(t, func(r *retry.R) { + services, _, err := client.Catalog().Service("bar", "", nil) + if err != nil { + r.Fatalf("err: %s", err) + } + if len(services) != 1 { + r.Fatal("service not found or too many") + } + }) - // Valid service should exist - var service *api.CatalogService - retry.Run(t, func(r *retry.R) { - services, _, err := client.Catalog().Service("bar", "", nil) - if err != nil { - r.Fatalf("err: %s", err) - } - if len(services) != 1 { - r.Fatal("service not found or too many") - } - service = services[0] - }) + // Create an invalid service directly in Consul + svc := testRegistration(node, "bar", "default") + svc.Service.ID = serviceID(node, "bar2") + _, err = client.Catalog().Register(svc, nil) + require.NoError(err) + + // Valid service should exist + var service *api.CatalogService + retry.Run(t, func(r *retry.R) { + services, _, err := client.Catalog().Service("bar", "", nil) + if err != nil { + r.Fatalf("err: %s", err) + } + if len(services) != 1 { + r.Fatal("service not found or too many") + } + service = services[0] + }) - // Verify the settings - require.Equal(serviceID("k8s-sync", "bar"), service.ServiceID) - require.Equal("k8s-sync", service.Node) - require.Equal("bar", service.ServiceName) - require.Equal("127.0.0.1", service.Address) + // Verify the settings + require.Equal(serviceID(node, "bar"), service.ServiceID) + require.Equal(node, service.Node) + require.Equal("bar", service.ServiceName) + require.Equal("127.0.0.1", service.Address) + }) + } } // Test that the syncer reaps services not registered by us that are tagged @@ -282,6 +295,7 @@ func testConsulSyncerWithConfig(client *api.Client, configurator func(*ConsulSyn SyncPeriod: 200 * time.Millisecond, ServicePollPeriod: 50 * time.Millisecond, ConsulK8STag: TestConsulK8STag, + ConsulNodeName: ConsulSyncNodeName, ConsulNodeServicesClient: &PreNamespacesNodeServicesClient{ Client: client, }, diff --git a/catalog/to-consul/testing.go b/catalog/to-consul/testing.go index 8379a943ac..89813ac5d6 100644 --- a/catalog/to-consul/testing.go +++ b/catalog/to-consul/testing.go @@ -10,16 +10,20 @@ const ( TestConsulK8STag = "k8s" ) -// TestSyncer implements Syncer for tests, giving easy access to the +// testSyncer implements Syncer for tests, giving easy access to the // set of registrations. -type TestSyncer struct { +type testSyncer struct { sync.Mutex // Lock should be held while accessing Registrations Registrations []*api.CatalogRegistration } // Sync implements Syncer -func (s *TestSyncer) Sync(rs []*api.CatalogRegistration) { +func (s *testSyncer) Sync(rs []*api.CatalogRegistration) { s.Lock() defer s.Unlock() s.Registrations = rs } + +func newTestSyncer() *testSyncer { + return &testSyncer{} +} diff --git a/subcommand/server-acl-init/command.go b/subcommand/server-acl-init/command.go index 41a3fc56f2..72ead467ce 100644 --- a/subcommand/server-acl-init/command.go +++ b/subcommand/server-acl-init/command.go @@ -7,6 +7,7 @@ import ( "fmt" "io/ioutil" "os" + "regexp" "strings" "sync" "time" @@ -38,7 +39,8 @@ type Command struct { flagCreateClientToken bool - flagCreateSyncToken bool + flagCreateSyncToken bool + flagSyncConsulNodeName string flagCreateInjectToken bool flagCreateInjectAuthMethod bool @@ -105,8 +107,12 @@ func (c *Command) init() { "Toggle for updating the anonymous token to allow DNS queries to work") c.flags.BoolVar(&c.flagCreateClientToken, "create-client-token", true, "Toggle for creating a client agent token. Default is true.") + c.flags.BoolVar(&c.flagCreateSyncToken, "create-sync-token", false, "Toggle for creating a catalog sync token.") + c.flags.StringVar(&c.flagSyncConsulNodeName, "sync-consul-node-name", "k8s-sync", + "The Consul node name to register for catalog sync. Defaults to k8s-sync. To be discoverable "+ + "via DNS, the name should only contain alpha-numerics and dashes.") c.flags.BoolVar(&c.flagCreateInjectToken, "create-inject-namespace-token", false, "Toggle for creating a connect injector token. Only required when namespaces are enabled.") @@ -210,12 +216,10 @@ func (c *Command) Run(args []string) int { c.UI.Error("Should have no non-flag arguments.") return 1 } - if len(c.flagServerAddresses) == 0 { - c.UI.Error("-server-address must be set at least once") - return 1 - } - if c.flagResourcePrefix == "" { - c.UI.Error("-resource-prefix must be set") + + // Validate flags + if err := c.validateFlags(); err != nil { + c.UI.Error(err.Error()) return 1 } @@ -751,6 +755,39 @@ func (c *Command) createAnonymousPolicy() bool { (c.flagCreateInjectAuthMethod && c.flagCreateACLReplicationToken)) } +func (c *Command) validateFlags() error { + if len(c.flagServerAddresses) == 0 { + return errors.New("-server-address must be set at least once") + } + + if c.flagResourcePrefix == "" { + return errors.New("-resource-prefix must be set") + } + + // For the Consul node name to be discoverable via DNS, it must contain only + // dashes and alphanumeric characters. Length is also constrained. + // These restrictions match those defined in Consul's agent definition. + var invalidDnsRe = regexp.MustCompile(`[^A-Za-z0-9\\-]+`) + const maxDNSLabelLength = 63 + + if invalidDnsRe.MatchString(c.flagSyncConsulNodeName) { + return fmt.Errorf("-sync-consul-node-name=%s is invalid: node name will not be discoverable "+ + "via DNS due to invalid characters. Valid characters include "+ + "all alpha-numerics and dashes", + c.flagSyncConsulNodeName, + ) + } + if len(c.flagSyncConsulNodeName) > maxDNSLabelLength { + return fmt.Errorf("-sync-consul-node-name=%s is invalid: node name will not be discoverable "+ + "via DNS due to it being too long. Valid lengths are between "+ + "1 and 63 bytes", + c.flagSyncConsulNodeName, + ) + } + + return nil +} + const consulDefaultNamespace = "default" const synopsis = "Initialize ACLs on Consul servers and other components." const help = ` diff --git a/subcommand/server-acl-init/command_test.go b/subcommand/server-acl-init/command_test.go index a26748e08e..7d72910755 100644 --- a/subcommand/server-acl-init/command_test.go +++ b/subcommand/server-acl-init/command_test.go @@ -56,6 +56,24 @@ func TestRun_FlagValidation(t *testing.T) { Flags: []string{"-bootstrap-token-file=/notexist", "-server-address=localhost", "-resource-prefix=prefix"}, ExpErr: "Unable to read bootstrap token from file \"/notexist\": open /notexist: no such file or directory", }, + { + Flags: []string{ + "-server-address=localhost", + "-resource-prefix=prefix", + "-sync-consul-node-name=Speci@l_Chars", + }, + ExpErr: "-sync-consul-node-name=Speci@l_Chars is invalid: node name will not be discoverable " + + "via DNS due to invalid characters. Valid characters include all alpha-numerics and dashes", + }, + { + Flags: []string{ + "-server-address=localhost", + "-resource-prefix=prefix", + "-sync-consul-node-name=5r9OPGfSRXUdGzNjBdAwmhCBrzHDNYs4XjZVR4wp7lSLIzqwS0ta51nBLIN0TMPV-too-long", + }, + ExpErr: "-sync-consul-node-name=5r9OPGfSRXUdGzNjBdAwmhCBrzHDNYs4XjZVR4wp7lSLIzqwS0ta51nBLIN0TMPV-too-long is invalid: node name will not be discoverable " + + "via DNS due to it being too long. Valid lengths are between 1 and 63 bytes", + }, } for _, c := range cases { @@ -877,7 +895,7 @@ func TestRun_BindingRuleUpdates(t *testing.T) { firstRunArgs := append(commonArgs, "-acl-binding-rule-selector=serviceaccount.name!=default", ) - // Our second run, we change the binding rule selector. + // On the second run, we change the binding rule selector. secondRunArgs := append(commonArgs, "-acl-binding-rule-selector=serviceaccount.name!=changed", ) @@ -933,6 +951,83 @@ func TestRun_BindingRuleUpdates(t *testing.T) { } } +// Test that the catalog sync policy is updated if the Consul node name changes. +func TestRun_SyncPolicyUpdates(t *testing.T) { + t.Parallel() + k8s, testSvr := completeSetup(t) + defer testSvr.Stop() + require := require.New(t) + + ui := cli.NewMockUi() + commonArgs := []string{ + "-resource-prefix=" + resourcePrefix, + "-k8s-namespace=" + ns, + "-server-address", strings.Split(testSvr.HTTPAddr, ":")[0], + "-server-port", strings.Split(testSvr.HTTPAddr, ":")[1], + "-create-sync-token", + } + firstRunArgs := append(commonArgs, + "-sync-consul-node-name=k8s-sync", + ) + // On the second run, we change the sync node name. + secondRunArgs := append(commonArgs, + "-sync-consul-node-name=new-node-name", + ) + + // Run the command first to populate the sync policy. + cmd := Command{ + UI: ui, + clientset: k8s, + } + responseCode := cmd.Run(firstRunArgs) + require.Equal(0, responseCode, ui.ErrorWriter.String()) + + // Create consul client + bootToken := getBootToken(t, k8s, resourcePrefix, ns) + consul, err := api.NewClient(&api.Config{ + Address: testSvr.HTTPAddr, + Token: bootToken, + }) + require.NoError(err) + + // Get and check the sync policy details + firstPolicies, _, err := consul.ACL().PolicyList(nil) + require.NoError(err) + + for _, p := range firstPolicies { + if p.Name == "catalog-sync-token" { + policy, _, err := consul.ACL().PolicyRead(p.ID, nil) + require.NoError(err) + + // Check the node name in the policy + require.Contains(policy.Rules, "k8s-sync") + } + } + + // Re-run the command with a new Consul node name. The sync policy should be updated. + // NOTE: We're redefining the command so that the old flag values are reset. + cmd = Command{ + UI: ui, + clientset: k8s, + } + responseCode = cmd.Run(secondRunArgs) + require.Equal(0, responseCode, ui.ErrorWriter.String()) + + // Get and check the sync policy details + secondPolicies, _, err := consul.ACL().PolicyList(nil) + require.NoError(err) + + for _, p := range secondPolicies { + if p.Name == "catalog-sync-token" { + policy, _, err := consul.ACL().PolicyRead(p.ID, nil) + require.NoError(err) + + // Check the node name in the policy + require.Contains(policy.Rules, "new-node-name") + } + } +} + // Test that if the servers aren't available at first that bootstrap // still succeeds. func TestRun_DelayedServers(t *testing.T) { diff --git a/subcommand/server-acl-init/create_or_update.go b/subcommand/server-acl-init/create_or_update.go index c8b4b28f58..cf847fff9c 100644 --- a/subcommand/server-acl-init/create_or_update.go +++ b/subcommand/server-acl-init/create_or_update.go @@ -107,8 +107,10 @@ func (c *Command) createOrUpdateACLPolicy(policy api.ACLPolicy, consulClient *ap // Consul version with namespace support or changes any of their namespace // settings, the policies associated with their ACL tokens will need to be // updated to be namespace aware. + // Allowing the Consul node name to be configurable also requires any sync + // policy to be updated in case the node name has changed. if isPolicyExistsErr(err, policy.Name) { - if c.flagEnableNamespaces { + if c.flagEnableNamespaces || c.flagCreateSyncToken { c.log.Info(fmt.Sprintf("Policy %q already exists, updating", policy.Name)) // The policy ID is required in any PolicyUpdate call, so first we need to diff --git a/subcommand/server-acl-init/rules.go b/subcommand/server-acl-init/rules.go index 1392861419..cd0e4fcb9f 100644 --- a/subcommand/server-acl-init/rules.go +++ b/subcommand/server-acl-init/rules.go @@ -11,6 +11,7 @@ type rulesData struct { ConsulSyncDestinationNamespace string EnableSyncK8SNSMirroring bool SyncK8SNSMirroringPrefix string + SyncConsulNodeName string } type gatewayRulesData struct { @@ -174,7 +175,7 @@ namespace "{{ .GatewayNamespace }}" { func (c *Command) syncRules() (string, error) { syncRulesTpl := ` - node "k8s-sync" { + node "{{ .SyncConsulNodeName }}" { policy = "write" } {{- if .EnableNamespaces }} @@ -246,6 +247,7 @@ func (c *Command) rulesData() rulesData { ConsulSyncDestinationNamespace: c.flagConsulSyncDestinationNamespace, EnableSyncK8SNSMirroring: c.flagEnableSyncK8SNSMirroring, SyncK8SNSMirroringPrefix: c.flagSyncK8SNSMirroringPrefix, + SyncConsulNodeName: c.flagSyncConsulNodeName, } } diff --git a/subcommand/server-acl-init/rules_test.go b/subcommand/server-acl-init/rules_test.go index 335475dd6f..2adaa3b689 100644 --- a/subcommand/server-acl-init/rules_test.go +++ b/subcommand/server-acl-init/rules_test.go @@ -314,6 +314,7 @@ func TestSyncRules(t *testing.T) { ConsulSyncDestinationNamespace string EnableSyncK8SNSMirroring bool SyncK8SNSMirroringPrefix string + SyncConsulNodeName string Expected string }{ { @@ -322,12 +323,30 @@ func TestSyncRules(t *testing.T) { "sync-namespace", true, "prefix-", + "k8s-sync", `node "k8s-sync" { policy = "write" } node_prefix "" { policy = "read" } + service_prefix "" { + policy = "write" + }`, + }, + { + "Namespaces are disabled, non-default node name", + false, + "sync-namespace", + true, + "prefix-", + "new-node-name", + `node "new-node-name" { + policy = "write" + } + node_prefix "" { + policy = "read" + } service_prefix "" { policy = "write" }`, @@ -338,10 +357,31 @@ func TestSyncRules(t *testing.T) { "sync-namespace", false, "prefix-", + "k8s-sync", `node "k8s-sync" { policy = "write" } operator = "write" +namespace "sync-namespace" { + node_prefix "" { + policy = "read" + } + service_prefix "" { + policy = "write" + } +}`, + }, + { + "Namespaces are enabled, mirroring disabled, non-default node name", + true, + "sync-namespace", + false, + "prefix-", + "new-node-name", + `node "new-node-name" { + policy = "write" + } +operator = "write" namespace "sync-namespace" { node_prefix "" { policy = "read" @@ -357,10 +397,31 @@ namespace "sync-namespace" { "sync-namespace", true, "", + "k8s-sync", `node "k8s-sync" { policy = "write" } operator = "write" +namespace_prefix "" { + node_prefix "" { + policy = "read" + } + service_prefix "" { + policy = "write" + } +}`, + }, + { + "Namespaces are enabled, mirroring enabled, prefix empty, non-default node name", + true, + "sync-namespace", + true, + "", + "new-node-name", + `node "new-node-name" { + policy = "write" + } +operator = "write" namespace_prefix "" { node_prefix "" { policy = "read" @@ -376,10 +437,31 @@ namespace_prefix "" { "sync-namespace", true, "prefix-", + "k8s-sync", `node "k8s-sync" { policy = "write" } operator = "write" +namespace_prefix "prefix-" { + node_prefix "" { + policy = "read" + } + service_prefix "" { + policy = "write" + } +}`, + }, + { + "Namespaces are enabled, mirroring enabled, prefix defined, non-default node name", + true, + "sync-namespace", + true, + "prefix-", + "new-node-name", + `node "new-node-name" { + policy = "write" + } +operator = "write" namespace_prefix "prefix-" { node_prefix "" { policy = "read" @@ -400,6 +482,7 @@ namespace_prefix "prefix-" { flagConsulSyncDestinationNamespace: tt.ConsulSyncDestinationNamespace, flagEnableSyncK8SNSMirroring: tt.EnableSyncK8SNSMirroring, flagSyncK8SNSMirroringPrefix: tt.SyncK8SNSMirroringPrefix, + flagSyncConsulNodeName: tt.SyncConsulNodeName, } syncRules, err := cmd.syncRules() diff --git a/subcommand/sync-catalog/command.go b/subcommand/sync-catalog/command.go index cc49e2727b..a9911ace0a 100644 --- a/subcommand/sync-catalog/command.go +++ b/subcommand/sync-catalog/command.go @@ -7,6 +7,7 @@ import ( "net/http" "os" "os/signal" + "regexp" "sync" "time" @@ -37,6 +38,7 @@ type Command struct { flagToK8S bool flagConsulDomain string flagConsulK8STag string + flagConsulNodeName string flagK8SDefault bool flagK8SServicePrefix string flagConsulServicePrefix string @@ -95,6 +97,9 @@ func (c *Command) init() { "Kubernetes. Defaults to consul.") c.flags.StringVar(&c.flagConsulK8STag, "consul-k8s-tag", "k8s", "Tag value for K8S services registered in Consul") + c.flags.StringVar(&c.flagConsulNodeName, "consul-node-name", "k8s-sync", + "The Consul node name to register for catalog sync. Defaults to k8s-sync. To be discoverable "+ + "via DNS, the name should only contain alpha-numerics and dashes.") c.flags.DurationVar(&c.flagConsulWritePeriod, "consul-write-interval", 30*time.Second, "The interval to perform syncing operations creating Consul services, formatted "+ "as a time.Duration. All changes are merged and write calls are only made "+ @@ -115,6 +120,7 @@ func (c *Command) init() { c.flags.StringVar(&c.flagLogLevel, "log-level", "info", "Log verbosity level. Supported values (in order of detail) are \"trace\", "+ "\"debug\", \"info\", \"warn\", and \"error\".") + c.flags.Var((*flags.AppendSliceValue)(&c.flagAllowK8sNamespacesList), "allow-k8s-namespace", "K8s namespaces to explicitly allow. May be specified multiple times.") c.flags.Var((*flags.AppendSliceValue)(&c.flagDenyK8sNamespacesList), "deny-k8s-namespace", @@ -158,6 +164,12 @@ func (c *Command) Run(args []string) int { return 1 } + // Validate flags + if err := c.validateFlags(); err != nil { + c.UI.Error(err.Error()) + return 1 + } + // Create the k8s clientset if c.clientset == nil { config, err := subcommand.K8SConfig(c.k8s.KubeConfig()) @@ -243,6 +255,7 @@ func (c *Command) Run(args []string) int { SyncPeriod: c.flagConsulWritePeriod, ServicePollPeriod: c.flagConsulWritePeriod * 2, ConsulK8STag: c.flagConsulK8STag, + ConsulNodeName: c.flagConsulNodeName, ConsulNodeServicesClient: svcsClient, } go syncer.Run(ctx) @@ -267,6 +280,7 @@ func (c *Command) Run(args []string) int { ConsulDestinationNamespace: c.flagConsulDestinationNamespace, EnableK8SNSMirroring: c.flagEnableK8SNSMirroring, K8SNSMirroringPrefix: c.flagK8SNSMirroringPrefix, + ConsulNodeName: c.flagConsulNodeName, }, } @@ -375,6 +389,29 @@ func (c *Command) interrupt() { c.sigCh <- os.Interrupt } +func (c *Command) validateFlags() error { + // For the Consul node name to be discoverable via DNS, it must contain only + // dashes and alphanumeric characters. Length is also constrained. + // These restrictions match those defined in Consul's agent definition. + var invalidDnsRe = regexp.MustCompile(`[^A-Za-z0-9\\-]+`) + const maxDNSLabelLength = 63 + + if invalidDnsRe.MatchString(c.flagConsulNodeName) { + return fmt.Errorf("-consul-node-name=%s is invalid: node name will not be discoverable "+ + "via DNS due to invalid characters. Valid characters include all alpha-numerics and dashes", + c.flagConsulNodeName, + ) + } + if len(c.flagConsulNodeName) > maxDNSLabelLength { + return fmt.Errorf("-consul-node-name=%s is invalid: node name will not be discoverable "+ + "via DNS due to it being too long. Valid lengths are between 1 and 63 bytes", + c.flagConsulNodeName, + ) + } + + return nil +} + const synopsis = "Sync Kubernetes services and Consul services." const help = ` Usage: consul-k8s sync-catalog [options] diff --git a/subcommand/sync-catalog/command_test.go b/subcommand/sync-catalog/command_test.go index 121c769dd5..3c62a2b68b 100644 --- a/subcommand/sync-catalog/command_test.go +++ b/subcommand/sync-catalog/command_test.go @@ -16,6 +16,39 @@ import ( "k8s.io/client-go/kubernetes/fake" ) +// Test flag validation +func TestRun_FlagValidation(t *testing.T) { + t.Parallel() + + cases := []struct { + Flags []string + ExpErr string + }{ + { + Flags: []string{"-consul-node-name=Speci@l_Chars"}, + ExpErr: "-consul-node-name=Speci@l_Chars is invalid: node name will not be discoverable " + + "via DNS due to invalid characters. Valid characters include all alpha-numerics and dashes", + }, + { + Flags: []string{"-consul-node-name=5r9OPGfSRXUdGzNjBdAwmhCBrzHDNYs4XjZVR4wp7lSLIzqwS0ta51nBLIN0TMPV-too-long"}, + ExpErr: "-consul-node-name=5r9OPGfSRXUdGzNjBdAwmhCBrzHDNYs4XjZVR4wp7lSLIzqwS0ta51nBLIN0TMPV-too-long is invalid: node name will not be discoverable " + + "via DNS due to it being too long. Valid lengths are between 1 and 63 bytes", + }, + } + + for _, c := range cases { + t.Run(c.ExpErr, func(t *testing.T) { + ui := cli.NewMockUi() + cmd := Command{ + UI: ui, + } + responseCode := cmd.Run(c.Flags) + require.Equal(t, 1, responseCode, ui.ErrorWriter.String()) + require.Contains(t, ui.ErrorWriter.String(), c.ExpErr) + }) + } +} + // Test that the default consul service is synced to k8s func TestRun_Defaults_SyncsConsulServiceToK8s(t *testing.T) { t.Parallel()