Skip to content

Commit

Permalink
Fix issue with peer stream node cleanup. (#17235)
Browse files Browse the repository at this point in the history
Fix issue with peer stream node cleanup.

This commit encompasses a few problems that are closely related due to their
proximity in the code.

1. The peerstream utilizes node IDs in several locations to determine which
nodes / services / checks should be cleaned up or created. While VM deployments
with agents will likely always have a node ID, agentless uses synthetic nodes
and does not populate the field. This means that for consul-k8s deployments, all
services were likely bundled together into the same synthetic node in some code
paths (but not all), resulting in strange behavior. The Node.Node field should
be used instead as a unique identifier, as it should always be populated.

2. The peerstream cleanup process for unused nodes uses an incorrect query for
node deregistration. This query is NOT namespace aware and results in the node
(and corresponding services) being deregistered prematurely whenever it has zero
default-namespace services and 1+ non-default-namespace services registered on
it. This issue is tricky to find due to the incorrect logic mentioned in #1,
combined with the fact that the affected services must be co-located on the same
node as the currently deregistering service for this to be encountered.

3. The stream tracker did not understand differences between services in
different namespaces and could therefore report incorrect numbers. It was
updated to utilize the full service name to avoid conflicts and return proper
results.
  • Loading branch information
hashi-derek authored May 8, 2023
1 parent 6fa1044 commit 50ef6a6
Show file tree
Hide file tree
Showing 5 changed files with 201 additions and 168 deletions.
3 changes: 3 additions & 0 deletions .changelog/17235.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
peering: Fix issue where peer streams could incorrectly deregister services in various scenarios.
```
39 changes: 23 additions & 16 deletions agent/grpc-external/services/peerstream/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
newproto "google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"

"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs"
Expand Down Expand Up @@ -341,7 +342,7 @@ func (s *Server) handleUpdateService(
for _, nodeSnap := range snap.Nodes {
// First register the node - skip the unchanged ones
changed := true
if storedNode, ok := storedNodesMap[nodeSnap.Node.ID]; ok {
if storedNode, ok := storedNodesMap[nodeSnap.Node.Node]; ok {
if storedNode.IsSame(nodeSnap.Node) {
changed = false
}
Expand All @@ -357,7 +358,7 @@ func (s *Server) handleUpdateService(
// Then register all services on that node - skip the unchanged ones
for _, svcSnap := range nodeSnap.Services {
changed = true
if storedSvcInst, ok := storedSvcInstMap[makeNodeSvcInstID(nodeSnap.Node.ID, svcSnap.Service.ID)]; ok {
if storedSvcInst, ok := storedSvcInstMap[makeNodeSvcInstID(nodeSnap.Node.Node, svcSnap.Service.ID)]; ok {
if storedSvcInst.IsSame(svcSnap.Service) {
changed = false
}
Expand All @@ -377,7 +378,7 @@ func (s *Server) handleUpdateService(
for _, svcSnap := range nodeSnap.Services {
for _, c := range svcSnap.Checks {
changed := true
if chk, ok := storedChecksMap[makeNodeCheckID(nodeSnap.Node.ID, svcSnap.Service.ID, c.CheckID)]; ok {
if chk, ok := storedChecksMap[makeNodeCheckID(nodeSnap.Node.Node, svcSnap.Service.ID, c.CheckID)]; ok {
if chk.IsSame(c) {
changed = false
}
Expand Down Expand Up @@ -515,8 +516,10 @@ func (s *Server) handleUpdateService(

// Delete any nodes that do not have any other services registered on them.
for node := range unusedNodes {
nodeMeta := structs.NodeEnterpriseMetaInPartition(sn.PartitionOrDefault())
_, ns, err := s.GetStore().NodeServices(nil, node, nodeMeta, peerName)
// The wildcard is used here so that all services, regardless of namespace are returned
// by the following query. Without this, the node might accidentally be cleaned up early.
wildcardNSMeta := acl.NewEnterpriseMetaWithPartition(sn.PartitionOrDefault(), acl.WildcardName)
_, ns, err := s.GetStore().NodeServiceList(nil, node, &wildcardNSMeta, peerName)
if err != nil {
return fmt.Errorf("failed to query services on node: %w", err)
}
Expand All @@ -529,10 +532,10 @@ func (s *Server) handleUpdateService(
err = s.Backend.CatalogDeregister(&structs.DeregisterRequest{
Node: node,
PeerName: peerName,
EnterpriseMeta: *nodeMeta,
EnterpriseMeta: *structs.NodeEnterpriseMetaInPartition(sn.PartitionOrDefault()),
})
if err != nil {
ident := fmt.Sprintf("partition:%s/peer:%s/node:%s", nodeMeta.PartitionOrDefault(), peerName, node)
ident := fmt.Sprintf("partition:%s/peer:%s/node:%s", sn.PartitionOrDefault(), peerName, node)
return fmt.Errorf("failed to deregister node %q: %w", ident, err)
}
}
Expand Down Expand Up @@ -635,31 +638,35 @@ type nodeCheckIdentity struct {
checkID string
}

func makeNodeSvcInstID(nodeID types.NodeID, serviceID string) nodeSvcInstIdentity {
func makeNodeSvcInstID(node string, serviceID string) nodeSvcInstIdentity {
return nodeSvcInstIdentity{
nodeID: string(nodeID),
nodeID: node,
serviceID: serviceID,
}
}

func makeNodeCheckID(nodeID types.NodeID, serviceID string, checkID types.CheckID) nodeCheckIdentity {
func makeNodeCheckID(node string, serviceID string, checkID types.CheckID) nodeCheckIdentity {
return nodeCheckIdentity{
serviceID: serviceID,
checkID: string(checkID),
nodeID: string(nodeID),
nodeID: node,
}
}

func buildStoredMap(storedInstances structs.CheckServiceNodes) (map[types.NodeID]*structs.Node, map[nodeSvcInstIdentity]*structs.NodeService, map[nodeCheckIdentity]*structs.HealthCheck) {
nodesMap := map[types.NodeID]*structs.Node{}
func buildStoredMap(storedInstances structs.CheckServiceNodes) (
map[string]*structs.Node,
map[nodeSvcInstIdentity]*structs.NodeService,
map[nodeCheckIdentity]*structs.HealthCheck,
) {
nodesMap := map[string]*structs.Node{}
svcInstMap := map[nodeSvcInstIdentity]*structs.NodeService{}
checksMap := map[nodeCheckIdentity]*structs.HealthCheck{}

for _, csn := range storedInstances {
nodesMap[csn.Node.ID] = csn.Node
svcInstMap[makeNodeSvcInstID(csn.Node.ID, csn.Service.ID)] = csn.Service
nodesMap[csn.Node.Node] = csn.Node
svcInstMap[makeNodeSvcInstID(csn.Node.Node, csn.Service.ID)] = csn.Service
for _, chk := range csn.Checks {
checksMap[makeNodeCheckID(csn.Node.ID, csn.Service.ID, chk.CheckID)] = chk
checksMap[makeNodeCheckID(csn.Node.Node, csn.Service.ID, chk.CheckID)] = chk
}
}
return nodesMap, svcInstMap, checksMap
Expand Down
2 changes: 1 addition & 1 deletion agent/grpc-external/services/peerstream/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ type StateStore interface {
ExportedServicesForPeer(ws memdb.WatchSet, peerID, dc string) (uint64, *structs.ExportedServiceList, error)
ServiceDump(ws memdb.WatchSet, kind structs.ServiceKind, useKind bool, entMeta *acl.EnterpriseMeta, peerName string) (uint64, structs.CheckServiceNodes, error)
CheckServiceNodes(ws memdb.WatchSet, serviceName string, entMeta *acl.EnterpriseMeta, peerName string) (uint64, structs.CheckServiceNodes, error)
NodeServices(ws memdb.WatchSet, nodeNameOrID string, entMeta *acl.EnterpriseMeta, peerName string) (uint64, *structs.NodeServices, error)
NodeServiceList(ws memdb.WatchSet, nodeNameOrID string, entMeta *acl.EnterpriseMeta, peerName string) (uint64, *structs.NodeServiceList, error)
CAConfig(ws memdb.WatchSet) (uint64, *structs.CAConfiguration, error)
TrustBundleListByService(ws memdb.WatchSet, service, dc string, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.PeeringTrustBundle, error)
ServiceList(ws memdb.WatchSet, entMeta *acl.EnterpriseMeta, peerName string) (uint64, structs.ServiceList, error)
Expand Down
Loading

0 comments on commit 50ef6a6

Please sign in to comment.