Skip to content

Commit

Permalink
Clean up imported nodes/services/checks as needed (#13367)
Browse files Browse the repository at this point in the history
Previously, imported data would never be deleted. As
nodes/services/checks were registered and deregistered, resources
deleted from the exporting cluster would accumulate in the imported
cluster.

This commit makes updates to replication so that whenever an update is
received for a service name we reconcile what was present in the catalog
against what was received.

This handleUpdateService method can handle both updates and deletions.
  • Loading branch information
freddygv authored Jun 13, 2022
1 parent edbf19f commit 71b2545
Show file tree
Hide file tree
Showing 5 changed files with 1,083 additions and 27 deletions.
5 changes: 5 additions & 0 deletions agent/consul/peering_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,5 +169,10 @@ func (a *peeringApply) CatalogRegister(req *structs.RegisterRequest) error {
return err
}

func (a *peeringApply) CatalogDeregister(req *structs.DeregisterRequest) error {
_, err := a.srv.leaderRaftApply("Catalog.Deregister", structs.DeregisterRequestType, req)
return err
}

var _ peering.Apply = (*peeringApply)(nil)
var _ peering.LeaderAddress = (*leaderAddr)(nil)
185 changes: 158 additions & 27 deletions agent/rpc/peering/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
"github.com/hashicorp/consul/types"
"github.com/hashicorp/go-hclog"
"google.golang.org/genproto/googleapis/rpc/code"
"google.golang.org/protobuf/types/known/anypb"
Expand Down Expand Up @@ -209,7 +210,7 @@ func (s *Service) handleUpsert(
return fmt.Errorf("failed to unmarshal resource: %w", err)
}

return s.handleUpsertService(peerName, partition, sn, csn)
return s.handleUpdateService(peerName, partition, sn, csn)

case pbpeering.TypeURLRoots:
roots := &pbpeering.PeeringTrustBundle{}
Expand All @@ -224,24 +225,29 @@ func (s *Service) handleUpsert(
}
}

func (s *Service) handleUpsertService(
// handleUpdateService handles both deletion and upsert events for a service.
// On an UPSERT event:
// - All nodes, services, checks in the input pbNodes are re-applied through Raft.
// - Any nodes, services, or checks in the catalog that were not in the input pbNodes get deleted.
//
// On a DELETE event:
// - A reconciliation against nil or empty input pbNodes leads to deleting all stored catalog resources
// associated with the service name.
func (s *Service) handleUpdateService(
peerName string,
partition string,
sn structs.ServiceName,
csn *pbservice.IndexedCheckServiceNodes,
pbNodes *pbservice.IndexedCheckServiceNodes,
) error {
if csn == nil || len(csn.Nodes) == 0 {
return s.handleDeleteService(peerName, partition, sn)
// Capture instances in the state store for reconciliation later.
_, storedInstances, err := s.Backend.Store().CheckServiceNodes(nil, sn.Name, &sn.EnterpriseMeta, peerName)
if err != nil {
return fmt.Errorf("failed to read imported services: %w", err)
}

// Convert exported data into structs format.
structsNodes := make([]structs.CheckServiceNode, 0, len(csn.Nodes))
for _, pb := range csn.Nodes {
instance, err := pbservice.CheckServiceNodeToStructs(pb)
if err != nil {
return fmt.Errorf("failed to convert instance: %w", err)
}
structsNodes = append(structsNodes, *instance)
structsNodes, err := pbNodes.CheckServiceNodesToStruct()
if err != nil {
return fmt.Errorf("failed to convert protobuf instances to structs: %w", err)
}

// Normalize the data into a convenient form for operation.
Expand Down Expand Up @@ -277,8 +283,145 @@ func (s *Service) handleUpsertService(
}
}

// TODO(peering): cleanup and deregister existing data that is now missing safely somehow
//
// Now that the data received has been stored in the state store, the rest of this
// function is responsible for cleaning up data in the catalog that wasn't in the snapshot.
//

// nodeCheckTuple uniquely identifies a node check in the catalog.
// The partition is not needed because we are only operating on one partition's catalog.
type nodeCheckTuple struct {
checkID types.CheckID
node string
}

var (
// unusedNodes tracks node names that were not present in the latest response.
// Missing nodes are not assumed to be deleted because there may be other service names
// registered on them.
// Inside we also track a map of node checks associated with the node.
unusedNodes = make(map[string]struct{})

// deletedNodeChecks tracks node checks that were not present in the latest response.
// A single node check will be attached to all service instances of a node, so this
// deduplication prevents issuing multiple deregistrations for a single check.
deletedNodeChecks = make(map[nodeCheckTuple]struct{})
)
for _, csn := range storedInstances {
if _, ok := snap.Nodes[csn.Node.ID]; !ok {
unusedNodes[string(csn.Node.ID)] = struct{}{}

// Since the node is not in the snapshot we can know the associated service
// instance is not in the snapshot either, since a service instance can't
// exist without a node.
// This will also delete all service checks.
err := s.Backend.Apply().CatalogDeregister(&structs.DeregisterRequest{
Node: csn.Node.Node,
ServiceID: csn.Service.ID,
EnterpriseMeta: csn.Service.EnterpriseMeta,
PeerName: peerName,
})
if err != nil {
return fmt.Errorf("failed to deregister service %q: %w", csn.Service.CompoundServiceID(), err)
}

// We can't know if a node check was deleted from the exporting cluster
// (but not the node itself) if the node wasn't in the snapshot,
// so we do not loop over checks here.
// If the unusedNode gets deleted below that will also delete node checks.
continue
}

// Delete the service instance if not in the snapshot.
sid := csn.Service.CompoundServiceID()
if _, ok := snap.Nodes[csn.Node.ID].Services[sid]; !ok {
err := s.Backend.Apply().CatalogDeregister(&structs.DeregisterRequest{
Node: csn.Node.Node,
ServiceID: csn.Service.ID,
EnterpriseMeta: csn.Service.EnterpriseMeta,
PeerName: peerName,
})
if err != nil {
ident := fmt.Sprintf("partition:%s/peer:%s/node:%s/ns:%s/service_id:%s",
csn.Service.PartitionOrDefault(), peerName, csn.Node.Node, csn.Service.NamespaceOrDefault(), csn.Service.ID)
return fmt.Errorf("failed to deregister service %q: %w", ident, err)
}

// When a service is deleted all associated checks also get deleted as a side effect.
continue
}

// Reconcile checks.
for _, chk := range csn.Checks {
if _, ok := snap.Nodes[csn.Node.ID].Services[sid].Checks[chk.CheckID]; !ok {
// Checks without a ServiceID are node checks.
// If the node exists but the check does not then the check was deleted.
if chk.ServiceID == "" {
// Deduplicate node checks to avoid deregistering a check multiple times.
tuple := nodeCheckTuple{
checkID: chk.CheckID,
node: chk.Node,
}
deletedNodeChecks[tuple] = struct{}{}
continue
}

// If the check isn't a node check then it's a service check.
// Service checks that were not present can be deleted immediately because
// checks for a given service ID will only be attached to a single CheckServiceNode.
err := s.Backend.Apply().CatalogDeregister(&structs.DeregisterRequest{
Node: chk.Node,
CheckID: chk.CheckID,
EnterpriseMeta: chk.EnterpriseMeta,
PeerName: peerName,
})
if err != nil {
ident := fmt.Sprintf("partition:%s/peer:%s/node:%s/ns:%s/check_id:%s",
chk.PartitionOrDefault(), peerName, chk.Node, chk.NamespaceOrDefault(), chk.CheckID)
return fmt.Errorf("failed to deregister check %q: %w", ident, err)
}
}
}
}

// Delete all deduplicated node checks.
for chk := range deletedNodeChecks {
nodeMeta := structs.NodeEnterpriseMetaInPartition(sn.PartitionOrDefault())
err := s.Backend.Apply().CatalogDeregister(&structs.DeregisterRequest{
Node: chk.node,
CheckID: chk.checkID,
EnterpriseMeta: *nodeMeta,
PeerName: peerName,
})
if err != nil {
ident := fmt.Sprintf("partition:%s/peer:%s/node:%s/check_id:%s", nodeMeta.PartitionOrDefault(), peerName, chk.node, chk.checkID)
return fmt.Errorf("failed to deregister node check %q: %w", ident, err)
}
}

// Delete any nodes that do not have any other services registered on them.
for node := range unusedNodes {
nodeMeta := structs.NodeEnterpriseMetaInPartition(sn.PartitionOrDefault())
_, ns, err := s.Backend.Store().NodeServices(nil, node, nodeMeta, peerName)
if err != nil {
return fmt.Errorf("failed to query services on node: %w", err)
}
if ns != nil && len(ns.Services) >= 1 {
// At least one service is still registered on this node, so we keep it.
continue
}

// All services on the node were deleted, so the node is also cleaned up.
err = s.Backend.Apply().CatalogDeregister(&structs.DeregisterRequest{
Node: node,
PeerName: peerName,
EnterpriseMeta: *nodeMeta,
})
if err != nil {
ident := fmt.Sprintf("partition:%s/peer:%s/node:%s", nodeMeta.PartitionOrDefault(), peerName, node)
return fmt.Errorf("failed to deregister node %q: %w", ident, err)
}
}
return nil
}

Expand Down Expand Up @@ -307,25 +450,13 @@ func (s *Service) handleDelete(
case pbpeering.TypeURLService:
sn := structs.ServiceNameFromString(resourceID)
sn.OverridePartition(partition)
return s.handleDeleteService(peerName, partition, sn)
return s.handleUpdateService(peerName, partition, sn, nil)

default:
return fmt.Errorf("unexpected resourceURL: %s", resourceURL)
}
}

func (s *Service) handleDeleteService(
peerName string,
partition string,
sn structs.ServiceName,
) error {
// Deregister: ServiceID == DeleteService ANd checks
// Deregister: ServiceID(empty) CheckID(empty) == DeleteNode

// TODO(peering): implement
return nil
}

func makeReply(resourceURL, nonce string, errCode code.Code, errMsg string) *pbpeering.ReplicationMessage {
var rpcErr *pbstatus.Status
if errCode != code.Code_OK || errMsg != "" {
Expand Down
3 changes: 3 additions & 0 deletions agent/rpc/peering/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ type Store interface {
PeeringTrustBundleRead(ws memdb.WatchSet, q state.Query) (uint64, *pbpeering.PeeringTrustBundle, error)
ExportedServicesForPeer(ws memdb.WatchSet, peerID string) (uint64, *structs.ExportedServiceList, error)
ServiceDump(ws memdb.WatchSet, kind structs.ServiceKind, useKind bool, entMeta *acl.EnterpriseMeta, peerName string) (uint64, structs.CheckServiceNodes, error)
CheckServiceNodes(ws memdb.WatchSet, serviceName string, entMeta *acl.EnterpriseMeta, peerName string) (uint64, structs.CheckServiceNodes, error)
NodeServices(ws memdb.WatchSet, nodeNameOrID string, entMeta *acl.EnterpriseMeta, peerName string) (uint64, *structs.NodeServices, error)
CAConfig(ws memdb.WatchSet) (uint64, *structs.CAConfiguration, error)
TrustBundleListByService(ws memdb.WatchSet, service string, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.PeeringTrustBundle, error)
AbandonCh() <-chan struct{}
Expand All @@ -142,6 +144,7 @@ type Apply interface {
PeeringTerminateByID(req *pbpeering.PeeringTerminateByIDRequest) error
PeeringTrustBundleWrite(req *pbpeering.PeeringTrustBundleWriteRequest) error
CatalogRegister(req *structs.RegisterRequest) error
CatalogDeregister(req *structs.DeregisterRequest) error
}

// GenerateToken implements the PeeringService RPC method to generate a
Expand Down
Loading

0 comments on commit 71b2545

Please sign in to comment.