Skip to content

Commit

Permalink
allow setting locality on services and nodes (#16581)
Browse files Browse the repository at this point in the history
  • Loading branch information
erichaberkorn authored Mar 10, 2023
1 parent 40312ac commit 57e2493
Show file tree
Hide file tree
Showing 44 changed files with 1,169 additions and 927 deletions.
1 change: 1 addition & 0 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,7 @@ func LocalConfig(cfg *config.RuntimeConfig) local.Config {
DiscardCheckOutput: cfg.DiscardCheckOutput,
NodeID: cfg.NodeID,
NodeName: cfg.NodeName,
NodeLocality: cfg.StructLocality(),
Partition: cfg.PartitionOrDefault(),
TaggedAddresses: map[string]string{},
}
Expand Down
1 change: 1 addition & 0 deletions agent/agent_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ func buildAgentService(s *structs.NodeService, dc string) api.AgentService {
ModifyIndex: s.ModifyIndex,
Weights: weights,
Datacenter: dc,
Locality: s.Locality.ToAPI(),
}

if as.Tags == nil {
Expand Down
1 change: 1 addition & 0 deletions agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,7 @@ func testAgent_AddService(t *testing.T, extraHCL string) {
Tags: []string{"tag1"},
Weights: nil, // nil weights...
Port: 8100,
Locality: &structs.Locality{Region: "us-west-1", Zone: "us-west-1a"},
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
},
// ... should be populated to avoid "IsSame" returning true during AE.
Expand Down
2 changes: 1 addition & 1 deletion agent/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ type Config struct {
LeaveOnTerm *bool `mapstructure:"leave_on_terminate" json:"leave_on_terminate,omitempty"`
LicensePath *string `mapstructure:"license_path" json:"license_path,omitempty"`
Limits Limits `mapstructure:"limits" json:"-"`
Locality Locality `mapstructure:"locality" json:"-"`
Locality *Locality `mapstructure:"locality" json:"-"`
LogLevel *string `mapstructure:"log_level" json:"log_level,omitempty"`
LogJSON *bool `mapstructure:"log_json" json:"log_json,omitempty"`
LogFile *string `mapstructure:"log_file" json:"log_file,omitempty"`
Expand Down
10 changes: 7 additions & 3 deletions agent/config/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -796,7 +796,7 @@ type RuntimeConfig struct {
// hcl: leave_on_terminate = (true|false)
LeaveOnTerm bool

Locality Locality
Locality *Locality

// Logging configuration used to initialize agent logging.
Logging logging.Config
Expand Down Expand Up @@ -1715,8 +1715,12 @@ func (c *RuntimeConfig) VersionWithMetadata() string {
return version
}

func (c *RuntimeConfig) StructLocality() structs.Locality {
return structs.Locality{
// StructLocality converts the RuntimeConfig Locality to a struct Locality.
func (c *RuntimeConfig) StructLocality() *structs.Locality {
if c.Locality == nil {
return nil
}
return &structs.Locality{
Region: stringVal(c.Locality.Region),
Zone: stringVal(c.Locality.Zone),
}
Expand Down
2 changes: 1 addition & 1 deletion agent/config/runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7092,7 +7092,7 @@ func TestRuntimeConfig_Sanitize(t *testing.T) {
},
},
},
Locality: Locality{Region: strPtr("us-west-1"), Zone: strPtr("us-west-1a")},
Locality: &Locality{Region: strPtr("us-west-1"), Zone: strPtr("us-west-1a")},
}

b, err := json.MarshalIndent(rt.Sanitized(), "", " ")
Expand Down
1 change: 1 addition & 0 deletions agent/config/testdata/TestRuntimeConfig_Sanitize.golden
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,7 @@
"EnterpriseMeta": {},
"ID": "",
"Kind": "",
"Locality": null,
"Meta": {},
"Name": "foo",
"Port": 0,
Expand Down
2 changes: 1 addition & 1 deletion agent/consul/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ type Config struct {

PeeringTestAllowPeerRegistrations bool

Locality structs.Locality
Locality *structs.Locality

// Embedded Consul Enterprise specific configuration
*EnterpriseConfig
Expand Down
3 changes: 2 additions & 1 deletion agent/consul/leader_peering.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/logging"
"github.com/hashicorp/consul/proto/private/pbcommon"
"github.com/hashicorp/consul/proto/private/pbpeering"
"github.com/hashicorp/consul/proto/private/pbpeerstream"
)
Expand Down Expand Up @@ -385,7 +386,7 @@ func (s *Server) establishStream(ctx context.Context,
Remote: &pbpeering.RemoteInfo{
Partition: peer.Partition,
Datacenter: s.config.Datacenter,
Locality: pbpeering.LocalityFromStruct(s.config.Locality),
Locality: pbcommon.LocalityToProto(s.config.Locality),
},
},
},
Expand Down
9 changes: 5 additions & 4 deletions agent/consul/leader_peering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/proto/private/pbcommon"
"github.com/hashicorp/consul/proto/private/pbpeering"
"github.com/hashicorp/consul/sdk/freeport"
"github.com/hashicorp/consul/sdk/testutil"
Expand Down Expand Up @@ -661,7 +662,7 @@ func TestLeader_Peering_RemoteInfo(t *testing.T) {
t.Skip("too slow for testing.Short")
}

acceptorLocality := structs.Locality{
acceptorLocality := &structs.Locality{
Region: "us-west-2",
Zone: "us-west-2a",
}
Expand Down Expand Up @@ -689,7 +690,7 @@ func TestLeader_Peering_RemoteInfo(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
t.Cleanup(cancel)

dialerLocality := structs.Locality{
dialerLocality := &structs.Locality{
Region: "us-west-1",
Zone: "us-west-1a",
}
Expand Down Expand Up @@ -755,7 +756,7 @@ func TestLeader_Peering_RemoteInfo(t *testing.T) {
require.NoError(t, err)
require.Equal(t, "dc1", p.Peering.Remote.Datacenter)
require.Contains(t, []string{"", "default"}, p.Peering.Remote.Partition)
require.Equal(t, pbpeering.LocalityFromStruct(acceptorLocality), p.Peering.Remote.Locality)
require.Equal(t, pbcommon.LocalityToProto(acceptorLocality), p.Peering.Remote.Locality)

// Retry fetching the until the peering is active in the acceptor.
ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second)
Expand All @@ -771,7 +772,7 @@ func TestLeader_Peering_RemoteInfo(t *testing.T) {
require.NotNil(t, p)
require.Equal(t, "dc2", p.Peering.Remote.Datacenter)
require.Contains(t, []string{"", "default"}, p.Peering.Remote.Partition)
require.Equal(t, pbpeering.LocalityFromStruct(dialerLocality), p.Peering.Remote.Locality)
require.Equal(t, pbcommon.LocalityToProto(dialerLocality), p.Peering.Remote.Locality)
}

// Test that the dialing peer attempts to reestablish connections when the accepting peer
Expand Down
1 change: 1 addition & 0 deletions agent/consul/state/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ func (s *Store) ensureRegistrationTxn(tx WriteTxn, idx uint64, preserveIndexes b
TaggedAddresses: req.TaggedAddresses,
Meta: req.NodeMeta,
PeerName: req.PeerName,
Locality: req.Locality,
}
if preserveIndexes {
node.CreateIndex = req.CreateIndex
Expand Down
23 changes: 15 additions & 8 deletions agent/consul/state/catalog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,21 +39,21 @@ func TestStateStore_GetNodeID(t *testing.T) {

_, out, err := s.GetNodeID(types.NodeID("wrongId"), nil, "")
if err == nil || out != nil || !strings.Contains(err.Error(), "node lookup by ID failed: index error: UUID (without hyphens) must be") {
t.Errorf("want an error, nil value, err:=%q ; out:=%q", err.Error(), out)
t.Errorf("want an error, nil value, err:=%q ; out:=%+v", err.Error(), out)
}
_, out, err = s.GetNodeID(types.NodeID("0123456789abcdefghijklmnopqrstuvwxyz"), nil, "")
if err == nil || out != nil || !strings.Contains(err.Error(), "node lookup by ID failed: index error: invalid UUID") {
t.Errorf("want an error, nil value, err:=%q ; out:=%q", err, out)
t.Errorf("want an error, nil value, err:=%q ; out:=%+v", err, out)
}

_, out, err = s.GetNodeID(types.NodeID("00a916bc-a357-4a19-b886-59419fcee50Z"), nil, "")
if err == nil || out != nil || !strings.Contains(err.Error(), "node lookup by ID failed: index error: invalid UUID") {
t.Errorf("want an error, nil value, err:=%q ; out:=%q", err, out)
t.Errorf("want an error, nil value, err:=%q ; out:=%+v", err, out)
}

_, out, err = s.GetNodeID(types.NodeID("00a916bc-a357-4a19-b886-59419fcee506"), nil, "")
if err != nil || out != nil {
t.Errorf("do not want any error nor returned value, err:=%q ; out:=%q", err, out)
t.Errorf("do not want any error nor returned value, err:=%q ; out:=%+v", err, out)
}

nodeID := types.NodeID("00a916bc-a357-4a19-b886-59419fceeaaa")
Expand Down Expand Up @@ -219,6 +219,7 @@ func TestStateStore_EnsureRegistration(t *testing.T) {
TaggedAddresses: map[string]string{"hello": "world"},
NodeMeta: map[string]string{"somekey": "somevalue"},
PeerName: peerName,
Locality: &structs.Locality{Region: "us-west-1", Zone: "us-west-1a"},
}
if f != nil {
f(req)
Expand All @@ -236,6 +237,7 @@ func TestStateStore_EnsureRegistration(t *testing.T) {
Meta: map[string]string{"somekey": "somevalue"},
RaftIndex: structs.RaftIndex{CreateIndex: 1, ModifyIndex: 1},
PeerName: peerName,
Locality: &structs.Locality{Region: "us-west-1", Zone: "us-west-1a"},
}

_, out, err := s.GetNode("node1", nil, peerName)
Expand All @@ -259,6 +261,7 @@ func TestStateStore_EnsureRegistration(t *testing.T) {
RaftIndex: structs.RaftIndex{CreateIndex: 2, ModifyIndex: 2},
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
PeerName: peerName,
Locality: &structs.Locality{Region: "us-west-1", Zone: "us-west-1a"},
},
}

Expand Down Expand Up @@ -368,6 +371,7 @@ func TestStateStore_EnsureRegistration(t *testing.T) {
Meta: map[string]string{strings.Repeat("a", 129): "somevalue"},
Tags: []string{"primary"},
PeerName: peerName,
Locality: &structs.Locality{Region: "us-west-1", Zone: "us-west-1a"},
}
})
testutil.RequireErrorContains(t, s.EnsureRegistration(9, req), `Key is too long (limit: 128 characters)`)
Expand All @@ -384,6 +388,7 @@ func TestStateStore_EnsureRegistration(t *testing.T) {
Tags: []string{"primary"},
Weights: &structs.Weights{Passing: 1, Warning: 1},
PeerName: peerName,
Locality: &structs.Locality{Region: "us-west-1", Zone: "us-west-1a"},
}
})
require.NoError(t, s.EnsureRegistration(2, req))
Expand All @@ -404,6 +409,7 @@ func TestStateStore_EnsureRegistration(t *testing.T) {
Tags: []string{"primary"},
Weights: &structs.Weights{Passing: 1, Warning: 1},
PeerName: peerName,
Locality: &structs.Locality{Region: "us-west-1", Zone: "us-west-1a"},
}
req.Check = &structs.HealthCheck{
Node: "node1",
Expand Down Expand Up @@ -432,6 +438,7 @@ func TestStateStore_EnsureRegistration(t *testing.T) {
Tags: []string{"primary"},
Weights: &structs.Weights{Passing: 1, Warning: 1},
PeerName: peerName,
Locality: &structs.Locality{Region: "us-west-1", Zone: "us-west-1a"},
}
req.Check = &structs.HealthCheck{
Node: "node1",
Expand Down Expand Up @@ -939,7 +946,7 @@ func TestNodeRenamingNodes(t *testing.T) {
}

if _, node, err := s.GetNodeID(nodeID1, nil, ""); err != nil || node == nil || node.ID != nodeID1 {
t.Fatalf("err: %s, node:= %q", err, node)
t.Fatalf("err: %s, node:= %+v", err, node)
}

if _, node, err := s.GetNodeID(nodeID2, nil, ""); err != nil && node == nil || node.ID != nodeID2 {
Expand Down Expand Up @@ -1121,7 +1128,7 @@ func TestStateStore_EnsureNode(t *testing.T) {
_, out, err = s.GetNode("node1", nil, "")
require.NoError(t, err)
if out != nil {
t.Fatalf("Node should not exist anymore: %q", out)
t.Fatalf("Node should not exist anymore: %+v", out)
}

idx, out, err = s.GetNode("node1-renamed", nil, "")
Expand Down Expand Up @@ -1277,10 +1284,10 @@ func TestStateStore_EnsureNode(t *testing.T) {
t.Fatalf("[DEPRECATED] err: %s", err)
}
if out.CreateIndex != 10 {
t.Fatalf("[DEPRECATED] We expected to modify node previously added, but add index = %d for node %q", out.CreateIndex, out)
t.Fatalf("[DEPRECATED] We expected to modify node previously added, but add index = %d for node %+v", out.CreateIndex, out)
}
if out.Address != "1.1.1.66" || out.ModifyIndex != 15 {
t.Fatalf("[DEPRECATED] Node with newNodeID should have been updated, but was: %d with content := %q", out.CreateIndex, out)
t.Fatalf("[DEPRECATED] Node with newNodeID should have been updated, but was: %d with content := %+v", out.CreateIndex, out)
}
}

Expand Down
19 changes: 10 additions & 9 deletions agent/consul/state/peering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/proto/private/pbcommon"
"github.com/hashicorp/consul/proto/private/pbpeering"
"github.com/hashicorp/consul/proto/private/prototest"
"github.com/hashicorp/consul/sdk/testutil"
Expand Down Expand Up @@ -1261,7 +1262,7 @@ func TestStore_PeeringWrite(t *testing.T) {
Remote: &pbpeering.RemoteInfo{
Partition: "part1",
Datacenter: "datacenter1",
Locality: &pbpeering.Locality{
Locality: &pbcommon.Locality{
Region: "us-west-1",
Zone: "us-west-1a",
},
Expand All @@ -1276,7 +1277,7 @@ func TestStore_PeeringWrite(t *testing.T) {
Remote: &pbpeering.RemoteInfo{
Partition: "part1",
Datacenter: "datacenter1",
Locality: &pbpeering.Locality{
Locality: &pbcommon.Locality{
Region: "us-west-1",
Zone: "us-west-1a",
},
Expand Down Expand Up @@ -1311,7 +1312,7 @@ func TestStore_PeeringWrite(t *testing.T) {
Remote: &pbpeering.RemoteInfo{
Partition: "part1",
Datacenter: "datacenter1",
Locality: &pbpeering.Locality{
Locality: &pbcommon.Locality{
Region: "us-west-1",
Zone: "us-west-1a",
},
Expand Down Expand Up @@ -1344,7 +1345,7 @@ func TestStore_PeeringWrite(t *testing.T) {
Remote: &pbpeering.RemoteInfo{
Partition: "part1",
Datacenter: "datacenter1",
Locality: &pbpeering.Locality{
Locality: &pbcommon.Locality{
Region: "us-west-1",
Zone: "us-west-1a",
},
Expand Down Expand Up @@ -1377,7 +1378,7 @@ func TestStore_PeeringWrite(t *testing.T) {
Remote: &pbpeering.RemoteInfo{
Partition: "part1",
Datacenter: "datacenter1",
Locality: &pbpeering.Locality{
Locality: &pbcommon.Locality{
Region: "us-west-1",
Zone: "us-west-1a",
},
Expand Down Expand Up @@ -1409,7 +1410,7 @@ func TestStore_PeeringWrite(t *testing.T) {
Remote: &pbpeering.RemoteInfo{
Partition: "part1",
Datacenter: "datacenter1",
Locality: &pbpeering.Locality{
Locality: &pbcommon.Locality{
Region: "us-west-1",
Zone: "us-west-1a",
},
Expand Down Expand Up @@ -1440,7 +1441,7 @@ func TestStore_PeeringWrite(t *testing.T) {
Remote: &pbpeering.RemoteInfo{
Partition: "part1",
Datacenter: "datacenter1",
Locality: &pbpeering.Locality{
Locality: &pbcommon.Locality{
Region: "us-west-1",
Zone: "us-west-1a",
},
Expand Down Expand Up @@ -1471,7 +1472,7 @@ func TestStore_PeeringWrite(t *testing.T) {
Remote: &pbpeering.RemoteInfo{
Partition: "part1",
Datacenter: "datacenter1",
Locality: &pbpeering.Locality{
Locality: &pbcommon.Locality{
Region: "us-west-1",
Zone: "us-west-1a",
},
Expand Down Expand Up @@ -1501,7 +1502,7 @@ func TestStore_PeeringWrite(t *testing.T) {
Remote: &pbpeering.RemoteInfo{
Partition: "part1",
Datacenter: "datacenter1",
Locality: &pbpeering.Locality{
Locality: &pbcommon.Locality{
Region: "us-west-1",
Zone: "us-west-1a",
},
Expand Down
3 changes: 3 additions & 0 deletions agent/local/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type Config struct {
DiscardCheckOutput bool
NodeID types.NodeID
NodeName string
NodeLocality *structs.Locality
Partition string // this defaults if empty
TaggedAddresses map[string]string
}
Expand Down Expand Up @@ -1073,6 +1074,7 @@ func (l *State) updateSyncState() error {
// Check if node info needs syncing
if svcNode == nil || svcNode.ID != l.config.NodeID ||
!reflect.DeepEqual(svcNode.TaggedAddresses, l.config.TaggedAddresses) ||
!reflect.DeepEqual(svcNode.Locality, l.config.NodeLocality) ||
!reflect.DeepEqual(svcNode.Meta, l.metadata) {
l.nodeInfoInSync = false
}
Expand Down Expand Up @@ -1565,6 +1567,7 @@ func (l *State) syncNodeInfo() error {
Node: l.config.NodeName,
Address: l.config.AdvertiseAddr,
TaggedAddresses: l.config.TaggedAddresses,
Locality: l.config.NodeLocality,
NodeMeta: l.metadata,
EnterpriseMeta: l.agentEnterpriseMeta,
WriteRequest: structs.WriteRequest{Token: at},
Expand Down
Loading

0 comments on commit 57e2493

Please sign in to comment.