Skip to content

Commit

Permalink
PR feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
freddygv committed Jun 7, 2022
1 parent 5564fcc commit db7d8a8
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 13 deletions.
31 changes: 19 additions & 12 deletions agent/rpc/peering/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ func (s *Service) handleUpdateService(

structsNodes, err := pbNodes.CheckServiceNodesToStruct()
if err != nil {
return fmt.Errorf("failed to convert protobuf instances to structs")
return fmt.Errorf("failed to convert protobuf instances to structs: %w", err)
}

// Normalize the data into a convenient form for operation.
Expand Down Expand Up @@ -305,7 +305,7 @@ func (s *Service) handleUpdateService(
// Missing nodes are not assumed to be deleted because there may be other service names
// registered on them.
// Inside we also track a map of node checks associated with the node.
unusedNodes = make(map[types.NodeID]struct{})
unusedNodes = make(map[string]struct{})

// deletedNodeChecks tracks node checks that were not present in the latest response.
// A single node check will be attached to all service instances of a node, so this
Expand All @@ -314,7 +314,7 @@ func (s *Service) handleUpdateService(
)
for _, csn := range storedInstances {
if _, ok := snap.Nodes[csn.Node.ID]; !ok {
unusedNodes[csn.Node.ID] = struct{}{}
unusedNodes[string(csn.Node.ID)] = struct{}{}

// Since the node is not in the snapshot we can know the associated service
// instance is not in the snapshot either, since a service instance can't
Expand All @@ -327,7 +327,7 @@ func (s *Service) handleUpdateService(
PeerName: peerName,
})
if err != nil {
return fmt.Errorf("failed to deregister service: %w", err)
return fmt.Errorf("failed to deregister service %q: %w", csn.Service.CompoundServiceID(), err)
}

// We can't know if a node check was deleted from the exporting cluster
Expand All @@ -347,7 +347,8 @@ func (s *Service) handleUpdateService(
PeerName: peerName,
})
if err != nil {
return fmt.Errorf("failed to deregister service: %w", err)
ident := fmt.Sprintf("partition:%s/peer:%s/node:%s/service_id:%s", csn.Service.PartitionOrDefault(), peerName, csn.Node.Node, csn.Service.ID)
return fmt.Errorf("failed to deregister service %q: %w", ident, err)
}

// When a service is deleted all associated checks also get deleted as a side effect.
Expand Down Expand Up @@ -379,28 +380,32 @@ func (s *Service) handleUpdateService(
PeerName: peerName,
})
if err != nil {
return fmt.Errorf("failed to deregister check: %w", err)
ident := fmt.Sprintf("partition:%s/peer:%s/node:%s/check_id:%s", chk.PartitionOrDefault(), peerName, chk.Node, chk.CheckID)
return fmt.Errorf("failed to deregister check %q: %w", ident, err)
}
}
}
}

// Delete all deduplicated node checks.
for chk := range deletedNodeChecks {
nodeMeta := structs.NodeEnterpriseMetaInPartition(sn.PartitionOrDefault())
err := s.Backend.Apply().CatalogDeregister(&structs.DeregisterRequest{
Node: chk.node,
CheckID: chk.checkID,
EnterpriseMeta: *structs.NodeEnterpriseMetaInPartition(sn.PartitionOrDefault()),
EnterpriseMeta: *nodeMeta,
PeerName: peerName,
})
if err != nil {
return fmt.Errorf("failed to deregister check: %w", err)
ident := fmt.Sprintf("partition:%s/peer:%s/node:%s/check_id:%s", nodeMeta.PartitionOrDefault(), peerName, chk.node, chk.checkID)
return fmt.Errorf("failed to deregister node check %q: %w", ident, err)
}
}

// Delete any nodes that do not have any other services registered on them.
for node := range unusedNodes {
_, ns, err := s.Backend.Store().NodeServices(nil, string(node), &sn.EnterpriseMeta, peerName)
nodeMeta := structs.NodeEnterpriseMetaInPartition(sn.PartitionOrDefault())
_, ns, err := s.Backend.Store().NodeServices(nil, node, nodeMeta, peerName)
if err != nil {
return fmt.Errorf("failed to query services on node: %w", err)
}
Expand All @@ -411,11 +416,13 @@ func (s *Service) handleUpdateService(

// All services on the node were deleted, so the node is also cleaned up.
err = s.Backend.Apply().CatalogDeregister(&structs.DeregisterRequest{
Node: string(node),
PeerName: peerName,
Node: node,
PeerName: peerName,
EnterpriseMeta: *nodeMeta,
})
if err != nil {
return fmt.Errorf("failed to deregister node: %w", err)
ident := fmt.Sprintf("partition:%s/peer:%s/node:%s", nodeMeta.PartitionOrDefault(), peerName, node)
return fmt.Errorf("failed to deregister node %q: %w", ident, err)
}
}
return nil
Expand Down
1 change: 0 additions & 1 deletion agent/rpc/peering/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1428,7 +1428,6 @@ func TestHandleUpdateService(t *testing.T) {
Node: "node-foo",
Partition: defaultMeta.PartitionOrEmpty(),
PeerName: peerName,
Address: "10.0.0.1",
},
Service: &structs.NodeService{
ID: "api-2",
Expand Down

0 comments on commit db7d8a8

Please sign in to comment.