diff --git a/.changelog/17235.txt b/.changelog/17235.txt new file mode 100644 index 000000000000..3356b715ef31 --- /dev/null +++ b/.changelog/17235.txt @@ -0,0 +1,3 @@ +```release-note:bug +peering: Fix issue where peer streams could incorrectly deregister services in various scenarios. +``` diff --git a/agent/grpc-external/services/peerstream/replication.go b/agent/grpc-external/services/peerstream/replication.go index 8f1491dadbfa..29d8db9dc6b2 100644 --- a/agent/grpc-external/services/peerstream/replication.go +++ b/agent/grpc-external/services/peerstream/replication.go @@ -13,6 +13,7 @@ import ( newproto "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/anypb" + "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/structs" @@ -341,7 +342,7 @@ func (s *Server) handleUpdateService( for _, nodeSnap := range snap.Nodes { // First register the node - skip the unchanged ones changed := true - if storedNode, ok := storedNodesMap[nodeSnap.Node.ID]; ok { + if storedNode, ok := storedNodesMap[nodeSnap.Node.Node]; ok { if storedNode.IsSame(nodeSnap.Node) { changed = false } @@ -357,7 +358,7 @@ func (s *Server) handleUpdateService( // Then register all services on that node - skip the unchanged ones for _, svcSnap := range nodeSnap.Services { changed = true - if storedSvcInst, ok := storedSvcInstMap[makeNodeSvcInstID(nodeSnap.Node.ID, svcSnap.Service.ID)]; ok { + if storedSvcInst, ok := storedSvcInstMap[makeNodeSvcInstID(nodeSnap.Node.Node, svcSnap.Service.ID)]; ok { if storedSvcInst.IsSame(svcSnap.Service) { changed = false } @@ -377,7 +378,7 @@ func (s *Server) handleUpdateService( for _, svcSnap := range nodeSnap.Services { for _, c := range svcSnap.Checks { changed := true - if chk, ok := storedChecksMap[makeNodeCheckID(nodeSnap.Node.ID, svcSnap.Service.ID, c.CheckID)]; ok { + if chk, ok := storedChecksMap[makeNodeCheckID(nodeSnap.Node.Node, svcSnap.Service.ID, c.CheckID)]; ok { if chk.IsSame(c) { changed = false } @@ -515,8 +516,10 @@ func (s *Server) handleUpdateService( // Delete any nodes that do not have any other services registered on them. for node := range unusedNodes { - nodeMeta := structs.NodeEnterpriseMetaInPartition(sn.PartitionOrDefault()) - _, ns, err := s.GetStore().NodeServices(nil, node, nodeMeta, peerName) + // The wildcard is used here so that all services, regardless of namespace are returned + // by the following query. Without this, the node might accidentally be cleaned up early. + wildcardNSMeta := acl.NewEnterpriseMetaWithPartition(sn.PartitionOrDefault(), acl.WildcardName) + _, ns, err := s.GetStore().NodeServiceList(nil, node, &wildcardNSMeta, peerName) if err != nil { return fmt.Errorf("failed to query services on node: %w", err) } @@ -529,10 +532,10 @@ func (s *Server) handleUpdateService( err = s.Backend.CatalogDeregister(&structs.DeregisterRequest{ Node: node, PeerName: peerName, - EnterpriseMeta: *nodeMeta, + EnterpriseMeta: *structs.NodeEnterpriseMetaInPartition(sn.PartitionOrDefault()), }) if err != nil { - ident := fmt.Sprintf("partition:%s/peer:%s/node:%s", nodeMeta.PartitionOrDefault(), peerName, node) + ident := fmt.Sprintf("partition:%s/peer:%s/node:%s", sn.PartitionOrDefault(), peerName, node) return fmt.Errorf("failed to deregister node %q: %w", ident, err) } } @@ -635,31 +638,35 @@ type nodeCheckIdentity struct { checkID string } -func makeNodeSvcInstID(nodeID types.NodeID, serviceID string) nodeSvcInstIdentity { +func makeNodeSvcInstID(node string, serviceID string) nodeSvcInstIdentity { return nodeSvcInstIdentity{ - nodeID: string(nodeID), + nodeID: node, serviceID: serviceID, } } -func makeNodeCheckID(nodeID types.NodeID, serviceID string, checkID types.CheckID) nodeCheckIdentity { +func makeNodeCheckID(node string, serviceID string, checkID types.CheckID) nodeCheckIdentity { return nodeCheckIdentity{ serviceID: serviceID, checkID: string(checkID), - nodeID: string(nodeID), + nodeID: node, } } -func buildStoredMap(storedInstances structs.CheckServiceNodes) (map[types.NodeID]*structs.Node, map[nodeSvcInstIdentity]*structs.NodeService, map[nodeCheckIdentity]*structs.HealthCheck) { - nodesMap := map[types.NodeID]*structs.Node{} +func buildStoredMap(storedInstances structs.CheckServiceNodes) ( + map[string]*structs.Node, + map[nodeSvcInstIdentity]*structs.NodeService, + map[nodeCheckIdentity]*structs.HealthCheck, +) { + nodesMap := map[string]*structs.Node{} svcInstMap := map[nodeSvcInstIdentity]*structs.NodeService{} checksMap := map[nodeCheckIdentity]*structs.HealthCheck{} for _, csn := range storedInstances { - nodesMap[csn.Node.ID] = csn.Node - svcInstMap[makeNodeSvcInstID(csn.Node.ID, csn.Service.ID)] = csn.Service + nodesMap[csn.Node.Node] = csn.Node + svcInstMap[makeNodeSvcInstID(csn.Node.Node, csn.Service.ID)] = csn.Service for _, chk := range csn.Checks { - checksMap[makeNodeCheckID(csn.Node.ID, csn.Service.ID, chk.CheckID)] = chk + checksMap[makeNodeCheckID(csn.Node.Node, csn.Service.ID, chk.CheckID)] = chk } } return nodesMap, svcInstMap, checksMap diff --git a/agent/grpc-external/services/peerstream/server.go b/agent/grpc-external/services/peerstream/server.go index e0dd4d759dfe..58e436bd1f5c 100644 --- a/agent/grpc-external/services/peerstream/server.go +++ b/agent/grpc-external/services/peerstream/server.go @@ -122,7 +122,7 @@ type StateStore interface { ExportedServicesForPeer(ws memdb.WatchSet, peerID, dc string) (uint64, *structs.ExportedServiceList, error) ServiceDump(ws memdb.WatchSet, kind structs.ServiceKind, useKind bool, entMeta *acl.EnterpriseMeta, peerName string) (uint64, structs.CheckServiceNodes, error) CheckServiceNodes(ws memdb.WatchSet, serviceName string, entMeta *acl.EnterpriseMeta, peerName string) (uint64, structs.CheckServiceNodes, error) - NodeServices(ws memdb.WatchSet, nodeNameOrID string, entMeta *acl.EnterpriseMeta, peerName string) (uint64, *structs.NodeServices, error) + NodeServiceList(ws memdb.WatchSet, nodeNameOrID string, entMeta *acl.EnterpriseMeta, peerName string) (uint64, *structs.NodeServiceList, error) CAConfig(ws memdb.WatchSet) (uint64, *structs.CAConfiguration, error) TrustBundleListByService(ws memdb.WatchSet, service, dc string, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.PeeringTrustBundle, error) ServiceList(ws memdb.WatchSet, entMeta *acl.EnterpriseMeta, peerName string) (uint64, structs.ServiceList, error) diff --git a/agent/grpc-external/services/peerstream/stream_test.go b/agent/grpc-external/services/peerstream/stream_test.go index 140ae947dc5b..0a7d6812ab4b 100644 --- a/agent/grpc-external/services/peerstream/stream_test.go +++ b/agent/grpc-external/services/peerstream/stream_test.go @@ -1594,7 +1594,11 @@ func Test_ExportedServicesCount(t *testing.T) { mst, err := srv.Tracker.Connected(peerID) require.NoError(t, err) - services := []string{"web", "api", "mongo"} + services := []string{ + structs.NewServiceName("web", nil).String(), + structs.NewServiceName("api", nil).String(), + structs.NewServiceName("mongo", nil).String(), + } update := cache.UpdateEvent{ CorrelationID: subExportedServiceList, Result: &pbpeerstream.ExportedServiceList{ @@ -1938,36 +1942,30 @@ func expectReplEvents(t *testing.T, client *MockClient, checkFns ...func(t *test } } -func Test_processResponse_ExportedServiceUpdates(t *testing.T) { - srv, store := newTestServer(t, func(c *Config) { - backend := c.Backend.(*testStreamBackend) - backend.leader = func() bool { - return false - } - }) - - type testCase struct { - name string - seed []*structs.RegisterRequest - input *pbpeerstream.ExportedService - expect map[string]structs.CheckServiceNodes - exportedServices []string - } - - peerName := "billing" - peerID := "1fabcd52-1d46-49b0-b1d8-71559aee47f5" - remoteMeta := pbcommon.NewEnterpriseMetaFromStructs(*structs.DefaultEnterpriseMetaInPartition("billing-ap")) - - // "api" service is imported from the billing-ap partition, corresponding to the billing peer. - // Locally it is stored to the default partition. - defaultMeta := *acl.DefaultEnterpriseMeta() - apiSN := structs.NewServiceName("api", &defaultMeta) +type PeeringProcessResponse_testCase struct { + name string + seed []*structs.RegisterRequest + inputServiceName structs.ServiceName + input *pbpeerstream.ExportedService + expect map[structs.ServiceName]structs.CheckServiceNodes + exportedServices []string +} +func processResponse_ExportedServiceUpdates( + t *testing.T, + srv *testServer, + store *state.Store, + localEntMeta acl.EnterpriseMeta, + peerName string, + tests []PeeringProcessResponse_testCase, +) { // create a peering in the state store + peerID := "1fabcd52-1d46-49b0-b1d8-71559aee47f5" require.NoError(t, store.PeeringWrite(31, &pbpeering.PeeringWriteRequest{ Peering: &pbpeering.Peering{ - ID: peerID, - Name: peerName, + ID: peerID, + Name: peerName, + Partition: localEntMeta.PartitionOrDefault(), }, })) @@ -1975,7 +1973,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { mst, err := srv.Tracker.Connected(peerID) require.NoError(t, err) - run := func(t *testing.T, tc testCase) { + run := func(t *testing.T, tc PeeringProcessResponse_testCase) { // Seed the local catalog with some data to reconcile against. // and increment the tracker's imported services count var serviceNames []structs.ServiceName @@ -1989,14 +1987,14 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { in := &pbpeerstream.ReplicationMessage_Response{ ResourceURL: pbpeerstream.TypeURLExportedService, - ResourceID: apiSN.String(), + ResourceID: tc.inputServiceName.String(), Nonce: "1", Operation: pbpeerstream.Operation_OPERATION_UPSERT, Resource: makeAnyPB(t, tc.input), } // Simulate an update arriving for billing/api. - _, err = srv.processResponse(peerName, acl.DefaultPartitionName, mst, in) + _, err = srv.processResponse(peerName, localEntMeta.PartitionOrDefault(), mst, in) require.NoError(t, err) if len(tc.exportedServices) > 0 { @@ -2009,63 +2007,81 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { } // Simulate an update arriving for billing/api. - _, err = srv.processResponse(peerName, acl.DefaultPartitionName, mst, resp) + _, err = srv.processResponse(peerName, localEntMeta.PartitionOrDefault(), mst, resp) require.NoError(t, err) // Test the count and contents separately to ensure the count code path is hit. require.Equal(t, mst.GetImportedServicesCount(), len(tc.exportedServices)) require.ElementsMatch(t, mst.ImportedServices, tc.exportedServices) } - _, allServices, err := srv.GetStore().ServiceList(nil, &defaultMeta, peerName) + wildcardNS := acl.NewEnterpriseMetaWithPartition(localEntMeta.PartitionOrDefault(), acl.WildcardName) + _, allServices, err := srv.GetStore().ServiceList(nil, &wildcardNS, peerName) require.NoError(t, err) // This ensures that only services specified under tc.expect are stored. It includes // all exported services plus their sidecar proxies. for _, svc := range allServices { - _, ok := tc.expect[svc.Name] + _, ok := tc.expect[svc] require.True(t, ok) } for svc, expect := range tc.expect { - t.Run(svc, func(t *testing.T) { - _, got, err := srv.GetStore().CheckServiceNodes(nil, svc, &defaultMeta, peerName) + t.Run(svc.String(), func(t *testing.T) { + _, got, err := srv.GetStore().CheckServiceNodes(nil, svc.Name, &svc.EnterpriseMeta, peerName) require.NoError(t, err) requireEqualInstances(t, expect, got) }) } } - tt := []testCase{ + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + run(t, tc) + }) + } +} + +func Test_processResponse_ExportedServiceUpdates(t *testing.T) { + peerName := "billing" + localEntMeta := *acl.DefaultEnterpriseMeta() + + remoteMeta := *structs.DefaultEnterpriseMetaInPartition("billing-ap") + pbRemoteMeta := pbcommon.NewEnterpriseMetaFromStructs(remoteMeta) + + apiLocalSN := structs.NewServiceName("api", &localEntMeta) + redisLocalSN := structs.NewServiceName("redis", &localEntMeta) + tests := []PeeringProcessResponse_testCase{ { name: "upsert two service instances to the same node", - exportedServices: []string{"api"}, + exportedServices: []string{apiLocalSN.String()}, + inputServiceName: structs.NewServiceName("api", &remoteMeta), input: &pbpeerstream.ExportedService{ Nodes: []*pbservice.CheckServiceNode{ { Node: &pbservice.Node{ ID: "af913374-68ea-41e5-82e8-6ffd3dffc461", Node: "node-foo", - Partition: remoteMeta.Partition, + Partition: pbRemoteMeta.Partition, PeerName: peerName, }, Service: &pbservice.NodeService{ ID: "api-1", Service: "api", - EnterpriseMeta: remoteMeta, + EnterpriseMeta: pbRemoteMeta, PeerName: peerName, }, Checks: []*pbservice.HealthCheck{ { CheckID: "node-foo-check", Node: "node-foo", - EnterpriseMeta: remoteMeta, + EnterpriseMeta: pbRemoteMeta, PeerName: peerName, }, { CheckID: "api-1-check", ServiceID: "api-1", Node: "node-foo", - EnterpriseMeta: remoteMeta, + EnterpriseMeta: pbRemoteMeta, PeerName: peerName, }, }, @@ -2074,42 +2090,42 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { Node: &pbservice.Node{ ID: "af913374-68ea-41e5-82e8-6ffd3dffc461", Node: "node-foo", - Partition: remoteMeta.Partition, + Partition: pbRemoteMeta.Partition, PeerName: peerName, }, Service: &pbservice.NodeService{ ID: "api-2", Service: "api", - EnterpriseMeta: remoteMeta, + EnterpriseMeta: pbRemoteMeta, PeerName: peerName, }, Checks: []*pbservice.HealthCheck{ { CheckID: "node-foo-check", Node: "node-foo", - EnterpriseMeta: remoteMeta, + EnterpriseMeta: pbRemoteMeta, PeerName: peerName, }, { CheckID: "api-2-check", ServiceID: "api-2", Node: "node-foo", - EnterpriseMeta: remoteMeta, + EnterpriseMeta: pbRemoteMeta, PeerName: peerName, }, }, }, }, }, - expect: map[string]structs.CheckServiceNodes{ - "api": { + expect: map[structs.ServiceName]structs.CheckServiceNodes{ + structs.NewServiceName("api", &localEntMeta): { { Node: &structs.Node{ ID: "af913374-68ea-41e5-82e8-6ffd3dffc461", Node: "node-foo", // The remote billing-ap partition is overwritten for all resources with the local default. - Partition: defaultMeta.PartitionOrEmpty(), + Partition: localEntMeta.PartitionOrEmpty(), // The name of the peer "billing" is attached as well. PeerName: peerName, @@ -2117,21 +2133,21 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { Service: &structs.NodeService{ ID: "api-1", Service: "api", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, Checks: []*structs.HealthCheck{ { CheckID: "node-foo-check", Node: "node-foo", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, { CheckID: "api-1-check", ServiceID: "api-1", Node: "node-foo", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, }, @@ -2140,27 +2156,27 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { Node: &structs.Node{ ID: "af913374-68ea-41e5-82e8-6ffd3dffc461", Node: "node-foo", - Partition: defaultMeta.PartitionOrEmpty(), + Partition: localEntMeta.PartitionOrEmpty(), PeerName: peerName, }, Service: &structs.NodeService{ ID: "api-2", Service: "api", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, Checks: []*structs.HealthCheck{ { CheckID: "node-foo-check", Node: "node-foo", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, { CheckID: "api-2-check", ServiceID: "api-2", Node: "node-foo", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, }, @@ -2170,7 +2186,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { }, { name: "deleting a service with an empty exported service event", - exportedServices: []string{"api"}, + exportedServices: []string{apiLocalSN.String()}, seed: []*structs.RegisterRequest{ { ID: types.NodeID("af913374-68ea-41e5-82e8-6ffd3dffc461"), @@ -2179,7 +2195,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { Service: &structs.NodeService{ ID: "api-2", Service: "api", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, Checks: structs.HealthChecks{ @@ -2197,41 +2213,43 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { }, }, }, - input: &pbpeerstream.ExportedService{}, - expect: map[string]structs.CheckServiceNodes{ - "api": {}, + inputServiceName: structs.NewServiceName("api", &remoteMeta), + input: &pbpeerstream.ExportedService{}, + expect: map[structs.ServiceName]structs.CheckServiceNodes{ + structs.NewServiceName("api", &localEntMeta): {}, }, }, { name: "upsert two service instances to different nodes", - exportedServices: []string{"api"}, + exportedServices: []string{apiLocalSN.String()}, + inputServiceName: structs.NewServiceName("api", &remoteMeta), input: &pbpeerstream.ExportedService{ Nodes: []*pbservice.CheckServiceNode{ { Node: &pbservice.Node{ ID: "af913374-68ea-41e5-82e8-6ffd3dffc461", Node: "node-foo", - Partition: remoteMeta.Partition, + Partition: pbRemoteMeta.Partition, PeerName: peerName, }, Service: &pbservice.NodeService{ ID: "api-1", Service: "api", - EnterpriseMeta: remoteMeta, + EnterpriseMeta: pbRemoteMeta, PeerName: peerName, }, Checks: []*pbservice.HealthCheck{ { CheckID: "node-foo-check", Node: "node-foo", - EnterpriseMeta: remoteMeta, + EnterpriseMeta: pbRemoteMeta, PeerName: peerName, }, { CheckID: "api-1-check", ServiceID: "api-1", Node: "node-foo", - EnterpriseMeta: remoteMeta, + EnterpriseMeta: pbRemoteMeta, PeerName: peerName, }, }, @@ -2240,60 +2258,60 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { Node: &pbservice.Node{ ID: "c0f97de9-4e1b-4e80-a1c6-cd8725835ab2", Node: "node-bar", - Partition: remoteMeta.Partition, + Partition: pbRemoteMeta.Partition, PeerName: peerName, }, Service: &pbservice.NodeService{ ID: "api-2", Service: "api", - EnterpriseMeta: remoteMeta, + EnterpriseMeta: pbRemoteMeta, PeerName: peerName, }, Checks: []*pbservice.HealthCheck{ { CheckID: "node-bar-check", Node: "node-bar", - EnterpriseMeta: remoteMeta, + EnterpriseMeta: pbRemoteMeta, PeerName: peerName, }, { CheckID: "api-2-check", ServiceID: "api-2", Node: "node-bar", - EnterpriseMeta: remoteMeta, + EnterpriseMeta: pbRemoteMeta, PeerName: peerName, }, }, }, }, }, - expect: map[string]structs.CheckServiceNodes{ - "api": { + expect: map[structs.ServiceName]structs.CheckServiceNodes{ + structs.NewServiceName("api", &localEntMeta): { { Node: &structs.Node{ ID: "c0f97de9-4e1b-4e80-a1c6-cd8725835ab2", Node: "node-bar", - Partition: defaultMeta.PartitionOrEmpty(), + Partition: localEntMeta.PartitionOrEmpty(), PeerName: peerName, }, Service: &structs.NodeService{ ID: "api-2", Service: "api", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, Checks: []*structs.HealthCheck{ { CheckID: "node-bar-check", Node: "node-bar", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, { CheckID: "api-2-check", ServiceID: "api-2", Node: "node-bar", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, }, @@ -2304,7 +2322,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { Node: "node-foo", // The remote billing-ap partition is overwritten for all resources with the local default. - Partition: defaultMeta.PartitionOrEmpty(), + Partition: localEntMeta.PartitionOrEmpty(), // The name of the peer "billing" is attached as well. PeerName: peerName, @@ -2312,21 +2330,21 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { Service: &structs.NodeService{ ID: "api-1", Service: "api", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, Checks: []*structs.HealthCheck{ { CheckID: "node-foo-check", Node: "node-foo", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, { CheckID: "api-1-check", ServiceID: "api-1", Node: "node-foo", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, }, @@ -2336,7 +2354,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { }, { name: "deleting one service name from a node does not delete other service names", - exportedServices: []string{"api", "redis"}, + exportedServices: []string{apiLocalSN.String(), redisLocalSN.String()}, seed: []*structs.RegisterRequest{ { ID: types.NodeID("af913374-68ea-41e5-82e8-6ffd3dffc461"), @@ -2345,7 +2363,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { Service: &structs.NodeService{ ID: "redis-2", Service: "redis", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, Checks: structs.HealthChecks{ @@ -2369,7 +2387,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { Service: &structs.NodeService{ ID: "api-1", Service: "api", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, Checks: structs.HealthChecks{ @@ -2387,37 +2405,38 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { }, }, }, + inputServiceName: structs.NewServiceName("api", &remoteMeta), // Nil input is for the "api" service. input: &pbpeerstream.ExportedService{}, - expect: map[string]structs.CheckServiceNodes{ - "api": {}, + expect: map[structs.ServiceName]structs.CheckServiceNodes{ + structs.NewServiceName("api", &localEntMeta): {}, // Existing redis service was not affected by deletion. - "redis": { + structs.NewServiceName("redis", &localEntMeta): { { Node: &structs.Node{ ID: "af913374-68ea-41e5-82e8-6ffd3dffc461", Node: "node-foo", - Partition: defaultMeta.PartitionOrEmpty(), + Partition: localEntMeta.PartitionOrEmpty(), PeerName: peerName, }, Service: &structs.NodeService{ ID: "redis-2", Service: "redis", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, Checks: []*structs.HealthCheck{ { CheckID: "node-foo-check", Node: "node-foo", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, { CheckID: "redis-2-check", ServiceID: "redis-2", Node: "node-foo", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, }, @@ -2435,7 +2454,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { Service: &structs.NodeService{ ID: "redis-2", Service: "redis", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, Checks: structs.HealthChecks{ @@ -2459,7 +2478,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { Service: &structs.NodeService{ ID: "redis-2-sidecar-proxy", Service: "redis-sidecar-proxy", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, Checks: structs.HealthChecks{ @@ -2483,7 +2502,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { Service: &structs.NodeService{ ID: "api-1", Service: "api", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, Checks: structs.HealthChecks{ @@ -2507,7 +2526,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { Service: &structs.NodeService{ ID: "api-1-sidecar-proxy", Service: "api-sidecar-proxy", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, Checks: structs.HealthChecks{ @@ -2526,68 +2545,69 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { }, }, }, + inputServiceName: structs.NewServiceName("api", &remoteMeta), // Nil input is for the "api" service. input: &pbpeerstream.ExportedService{}, - exportedServices: []string{"redis"}, - expect: map[string]structs.CheckServiceNodes{ + exportedServices: []string{redisLocalSN.String()}, + expect: map[structs.ServiceName]structs.CheckServiceNodes{ // Existing redis service was not affected by deletion. - "redis": { + structs.NewServiceName("redis", &localEntMeta): { { Node: &structs.Node{ ID: "af913374-68ea-41e5-82e8-6ffd3dffc461", Node: "node-foo", - Partition: defaultMeta.PartitionOrEmpty(), + Partition: localEntMeta.PartitionOrEmpty(), PeerName: peerName, }, Service: &structs.NodeService{ ID: "redis-2", Service: "redis", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, Checks: []*structs.HealthCheck{ { CheckID: "node-foo-check", Node: "node-foo", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, { CheckID: "redis-2-check", ServiceID: "redis-2", Node: "node-foo", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, }, }, }, - "redis-sidecar-proxy": { + structs.NewServiceName("redis-sidecar-proxy", &localEntMeta): { { Node: &structs.Node{ ID: "af913374-68ea-41e5-82e8-6ffd3dffc461", Node: "node-foo", - Partition: defaultMeta.PartitionOrEmpty(), + Partition: localEntMeta.PartitionOrEmpty(), PeerName: peerName, }, Service: &structs.NodeService{ ID: "redis-2-sidecar-proxy", Service: "redis-sidecar-proxy", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, Checks: []*structs.HealthCheck{ { CheckID: "node-foo-check", Node: "node-foo", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, { CheckID: "redis-2-sidecar-proxy-check", ServiceID: "redis-2-sidecar-proxy", Node: "node-foo", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, }, @@ -2597,7 +2617,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { }, { name: "service checks are cleaned up when not present in a response", - exportedServices: []string{"api"}, + exportedServices: []string{apiLocalSN.String()}, seed: []*structs.RegisterRequest{ { ID: types.NodeID("af913374-68ea-41e5-82e8-6ffd3dffc461"), @@ -2606,7 +2626,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { Service: &structs.NodeService{ ID: "api-1", Service: "api", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, Checks: structs.HealthChecks{ @@ -2624,19 +2644,20 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { }, }, }, + inputServiceName: structs.NewServiceName("api", &remoteMeta), input: &pbpeerstream.ExportedService{ Nodes: []*pbservice.CheckServiceNode{ { Node: &pbservice.Node{ ID: "af913374-68ea-41e5-82e8-6ffd3dffc461", Node: "node-foo", - Partition: remoteMeta.Partition, + Partition: pbRemoteMeta.Partition, PeerName: peerName, }, Service: &pbservice.NodeService{ ID: "api-1", Service: "api", - EnterpriseMeta: remoteMeta, + EnterpriseMeta: pbRemoteMeta, PeerName: peerName, }, Checks: []*pbservice.HealthCheck{ @@ -2645,20 +2666,20 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { }, }, }, - expect: map[string]structs.CheckServiceNodes{ + expect: map[structs.ServiceName]structs.CheckServiceNodes{ // Service check should be gone - "api": { + structs.NewServiceName("api", &localEntMeta): { { Node: &structs.Node{ ID: "af913374-68ea-41e5-82e8-6ffd3dffc461", Node: "node-foo", - Partition: defaultMeta.PartitionOrEmpty(), + Partition: localEntMeta.PartitionOrEmpty(), PeerName: peerName, }, Service: &structs.NodeService{ ID: "api-1", Service: "api", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, Checks: []*structs.HealthCheck{}, @@ -2668,7 +2689,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { }, { name: "node checks are cleaned up when not present in a response", - exportedServices: []string{"api", "redis"}, + exportedServices: []string{apiLocalSN.String(), redisLocalSN.String()}, seed: []*structs.RegisterRequest{ { ID: types.NodeID("af913374-68ea-41e5-82e8-6ffd3dffc461"), @@ -2677,7 +2698,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { Service: &structs.NodeService{ ID: "redis-2", Service: "redis", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, Checks: structs.HealthChecks{ @@ -2701,7 +2722,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { Service: &structs.NodeService{ ID: "api-1", Service: "api", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, Checks: structs.HealthChecks{ @@ -2719,19 +2740,20 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { }, }, }, + inputServiceName: structs.NewServiceName("api", &remoteMeta), input: &pbpeerstream.ExportedService{ Nodes: []*pbservice.CheckServiceNode{ { Node: &pbservice.Node{ ID: "af913374-68ea-41e5-82e8-6ffd3dffc461", Node: "node-foo", - Partition: remoteMeta.Partition, + Partition: pbRemoteMeta.Partition, PeerName: peerName, }, Service: &pbservice.NodeService{ ID: "api-1", Service: "api", - EnterpriseMeta: remoteMeta, + EnterpriseMeta: pbRemoteMeta, PeerName: peerName, }, Checks: []*pbservice.HealthCheck{ @@ -2740,27 +2762,27 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { CheckID: "api-1-check", ServiceID: "api-1", Node: "node-foo", - EnterpriseMeta: remoteMeta, + EnterpriseMeta: pbRemoteMeta, PeerName: peerName, }, }, }, }, }, - expect: map[string]structs.CheckServiceNodes{ + expect: map[structs.ServiceName]structs.CheckServiceNodes{ // Node check should be gone - "api": { + structs.NewServiceName("api", &localEntMeta): { { Node: &structs.Node{ ID: "af913374-68ea-41e5-82e8-6ffd3dffc461", Node: "node-foo", - Partition: defaultMeta.PartitionOrEmpty(), + Partition: localEntMeta.PartitionOrEmpty(), PeerName: peerName, }, Service: &structs.NodeService{ ID: "api-1", Service: "api", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, Checks: []*structs.HealthCheck{ @@ -2768,24 +2790,24 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { CheckID: "api-1-check", ServiceID: "api-1", Node: "node-foo", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, }, }, }, - "redis": { + structs.NewServiceName("redis", &localEntMeta): { { Node: &structs.Node{ ID: "af913374-68ea-41e5-82e8-6ffd3dffc461", Node: "node-foo", - Partition: defaultMeta.PartitionOrEmpty(), + Partition: localEntMeta.PartitionOrEmpty(), PeerName: peerName, }, Service: &structs.NodeService{ ID: "redis-2", Service: "redis", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, Checks: []*structs.HealthCheck{ @@ -2793,7 +2815,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { CheckID: "redis-2-check", ServiceID: "redis-2", Node: "node-foo", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, }, @@ -2803,7 +2825,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { }, { name: "replacing a service instance on a node cleans up the old instance", - exportedServices: []string{"api", "redis"}, + exportedServices: []string{apiLocalSN.String(), redisLocalSN.String()}, seed: []*structs.RegisterRequest{ { ID: types.NodeID("af913374-68ea-41e5-82e8-6ffd3dffc461"), @@ -2812,7 +2834,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { Service: &structs.NodeService{ ID: "redis-2", Service: "redis", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, Checks: structs.HealthChecks{ @@ -2836,7 +2858,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { Service: &structs.NodeService{ ID: "api-1", Service: "api", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, Checks: structs.HealthChecks{ @@ -2854,20 +2876,21 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { }, }, }, + inputServiceName: structs.NewServiceName("api", &remoteMeta), input: &pbpeerstream.ExportedService{ Nodes: []*pbservice.CheckServiceNode{ { Node: &pbservice.Node{ ID: "af913374-68ea-41e5-82e8-6ffd3dffc461", Node: "node-foo", - Partition: remoteMeta.Partition, + Partition: pbRemoteMeta.Partition, PeerName: peerName, }, // New service ID and checks for the api service. Service: &pbservice.NodeService{ ID: "new-api-v2", Service: "api", - EnterpriseMeta: remoteMeta, + EnterpriseMeta: pbRemoteMeta, PeerName: peerName, }, Checks: []*pbservice.HealthCheck{ @@ -2886,19 +2909,19 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { }, }, }, - expect: map[string]structs.CheckServiceNodes{ - "api": { + expect: map[structs.ServiceName]structs.CheckServiceNodes{ + structs.NewServiceName("api", &localEntMeta): { { Node: &structs.Node{ ID: "af913374-68ea-41e5-82e8-6ffd3dffc461", Node: "node-foo", - Partition: defaultMeta.PartitionOrEmpty(), + Partition: localEntMeta.PartitionOrEmpty(), PeerName: peerName, }, Service: &structs.NodeService{ ID: "new-api-v2", Service: "api", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, Checks: []*structs.HealthCheck{ @@ -2911,24 +2934,24 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { CheckID: "new-api-v2-check", ServiceID: "new-api-v2", Node: "node-foo", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, }, }, }, - "redis": { + structs.NewServiceName("redis", &localEntMeta): { { Node: &structs.Node{ ID: "af913374-68ea-41e5-82e8-6ffd3dffc461", Node: "node-foo", - Partition: defaultMeta.PartitionOrEmpty(), + Partition: localEntMeta.PartitionOrEmpty(), PeerName: peerName, }, Service: &structs.NodeService{ ID: "redis-2", Service: "redis", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, Checks: []*structs.HealthCheck{ @@ -2941,7 +2964,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { CheckID: "redis-2-check", ServiceID: "redis-2", Node: "node-foo", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, }, @@ -2950,12 +2973,13 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { }, }, } - - for _, tc := range tt { - t.Run(tc.name, func(t *testing.T) { - run(t, tc) - }) - } + srv, store := newTestServer(t, func(c *Config) { + backend := c.Backend.(*testStreamBackend) + backend.leader = func() bool { + return false + } + }) + processResponse_ExportedServiceUpdates(t, srv, store, localEntMeta, peerName, tests) } // TestLogTraceProto tests that all PB trace log helpers redact the diff --git a/agent/grpc-external/services/peerstream/stream_tracker.go b/agent/grpc-external/services/peerstream/stream_tracker.go index a93457b92546..abb5a003a399 100644 --- a/agent/grpc-external/services/peerstream/stream_tracker.go +++ b/agent/grpc-external/services/peerstream/stream_tracker.go @@ -372,9 +372,8 @@ func (s *MutableStatus) SetImportedServices(serviceNames []structs.ServiceName) defer s.mu.Unlock() s.ImportedServices = make([]string, len(serviceNames)) - for i, sn := range serviceNames { - s.ImportedServices[i] = sn.Name + s.ImportedServices[i] = sn.String() } } @@ -392,7 +391,7 @@ func (s *MutableStatus) SetExportedServices(serviceNames []structs.ServiceName) s.ExportedServices = make([]string, len(serviceNames)) for i, sn := range serviceNames { - s.ExportedServices[i] = sn.Name + s.ExportedServices[i] = sn.String() } }