Skip to content

Commit

Permalink
Add Peer Locality to Discovery Chains (#16588)
Browse files Browse the repository at this point in the history
Add peer locality to discovery chains
  • Loading branch information
erichaberkorn authored Mar 10, 2023
1 parent 57e2493 commit e298f50
Show file tree
Hide file tree
Showing 8 changed files with 153 additions and 6 deletions.
13 changes: 13 additions & 0 deletions agent/configentry/discoverychain.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package configentry

import (
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/proto/private/pbpeering"
)

// DiscoveryChainSet is a wrapped set of raw cross-referenced config entries
Expand All @@ -13,6 +14,7 @@ type DiscoveryChainSet struct {
Splitters map[structs.ServiceID]*structs.ServiceSplitterConfigEntry
Resolvers map[structs.ServiceID]*structs.ServiceResolverConfigEntry
Services map[structs.ServiceID]*structs.ServiceConfigEntry
Peers map[string]*pbpeering.Peering
ProxyDefaults map[string]*structs.ProxyConfigEntry
}

Expand All @@ -22,6 +24,7 @@ func NewDiscoveryChainSet() *DiscoveryChainSet {
Splitters: make(map[structs.ServiceID]*structs.ServiceSplitterConfigEntry),
Resolvers: make(map[structs.ServiceID]*structs.ServiceResolverConfigEntry),
Services: make(map[structs.ServiceID]*structs.ServiceConfigEntry),
Peers: make(map[string]*pbpeering.Peering),
ProxyDefaults: make(map[string]*structs.ProxyConfigEntry),
}
}
Expand Down Expand Up @@ -111,6 +114,16 @@ func (e *DiscoveryChainSet) AddProxyDefaults(entries ...*structs.ProxyConfigEntr
}
}

// AddPeers adds cluster peers. Convenience function for testing.
func (e *DiscoveryChainSet) AddPeers(entries ...*pbpeering.Peering) {
if e.Peers == nil {
e.Peers = make(map[string]*pbpeering.Peering)
}
for _, entry := range entries {
e.Peers[entry.Name] = entry
}
}

// AddEntries adds generic configs. Convenience function for testing. Panics on
// operator error.
func (e *DiscoveryChainSet) AddEntries(entries ...structs.ConfigEntry) {
Expand Down
6 changes: 6 additions & 0 deletions agent/consul/discoverychain/compile.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/hashicorp/consul/agent/configentry"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/proto/private/pbpeering"
)

type CompileRequest struct {
Expand Down Expand Up @@ -736,6 +737,11 @@ func (c *compiler) newTarget(opts structs.DiscoveryTargetOpts) *structs.Discover
// Use the same representation for the name. This will NOT be overridden
// later.
t.Name = t.SNI
} else {
peer := c.entries.Peers[opts.Peer]
if peer != nil && peer.Remote != nil {
t.Locality = pbpeering.LocalityToStructs(peer.Remote.Locality)
}
}

prev, ok := c.loadedTargets[t.ID]
Expand Down
31 changes: 31 additions & 0 deletions agent/consul/discoverychain/compile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"github.com/hashicorp/consul/agent/configentry"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/proto/private/pbcommon"
"github.com/hashicorp/consul/proto/private/pbpeering"
)

type compileTestCase struct {
Expand Down Expand Up @@ -1578,12 +1580,25 @@ func testcase_Failover_Targets() compileTestCase {
{Datacenter: "dc3"},
{Service: "new-main"},
{Peer: "cluster-01"},
{Peer: "cluster-02"},
},
},
},
},
)

entries.AddPeers(
&pbpeering.Peering{
Name: "cluster-01",
Remote: &pbpeering.RemoteInfo{
Locality: &pbcommon.Locality{
Region: "us-west-1",
Zone: "us-west-1a",
},
},
},
)

expect := &structs.CompiledDiscoveryChain{
Protocol: "tcp",
StartNode: "resolver:main.default.default.dc1",
Expand All @@ -1599,6 +1614,7 @@ func testcase_Failover_Targets() compileTestCase {
"main.default.default.dc3",
"new-main.default.default.dc1",
"main.default.default.external.cluster-01",
"main.default.default.external.cluster-02",
},
},
},
Expand Down Expand Up @@ -1626,6 +1642,21 @@ func testcase_Failover_Targets() compileTestCase {
"main.default.default.external.cluster-01": newTarget(structs.DiscoveryTargetOpts{
Service: "main",
Peer: "cluster-01",
}, func(t *structs.DiscoveryTarget) {
t.SNI = ""
t.Name = ""
t.Datacenter = ""
t.MeshGateway = structs.MeshGatewayConfig{
Mode: structs.MeshGatewayModeRemote,
}
t.Locality = &structs.Locality{
Region: "us-west-1",
Zone: "us-west-1a",
}
}),
"main.default.default.external.cluster-02": newTarget(structs.DiscoveryTargetOpts{
Service: "main",
Peer: "cluster-02",
}, func(t *structs.DiscoveryTarget) {
t.SNI = ""
t.Name = ""
Expand Down
22 changes: 22 additions & 0 deletions agent/consul/state/config_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -1293,6 +1293,7 @@ func readDiscoveryChainConfigEntriesTxn(
todoSplitters = make(map[structs.ServiceID]struct{})
todoResolvers = make(map[structs.ServiceID]struct{})
todoDefaults = make(map[structs.ServiceID]struct{})
todoPeers = make(map[string]struct{})
)

sid := structs.NewServiceID(serviceName, entMeta)
Expand Down Expand Up @@ -1394,6 +1395,10 @@ func readDiscoveryChainConfigEntriesTxn(
for _, svc := range resolver.ListRelatedServices() {
todoResolvers[svc] = struct{}{}
}

for _, peer := range resolver.RelatedPeers() {
todoPeers[peer] = struct{}{}
}
}

for {
Expand Down Expand Up @@ -1435,6 +1440,23 @@ func readDiscoveryChainConfigEntriesTxn(
res.Services[svcID] = entry
}

peerEntMeta := structs.DefaultEnterpriseMetaInPartition(entMeta.PartitionOrDefault())
for peerName := range todoPeers {
q := Query{
Value: peerName,
EnterpriseMeta: *peerEntMeta,
}
idx, entry, err := peeringReadTxn(tx, ws, q)
if err != nil {
return 0, nil, err
}
if idx > maxIdx {
maxIdx = idx
}

res.Peers[peerName] = entry
}

// Strip nils now that they are no longer necessary.
for sid, entry := range res.Routers {
if entry == nil {
Expand Down
49 changes: 49 additions & 0 deletions agent/consul/state/config_entry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (

"github.com/hashicorp/consul/agent/configentry"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/proto/private/pbpeering"
"github.com/hashicorp/consul/proto/private/prototest"
"github.com/hashicorp/consul/sdk/testutil"
)

Expand Down Expand Up @@ -2065,6 +2067,53 @@ func TestStore_ReadDiscoveryChainConfigEntries_SubsetSplit(t *testing.T) {
require.Len(t, entrySet.Services, 1)
}

func TestStore_ReadDiscoveryChainConfigEntries_FetchPeers(t *testing.T) {
s := testConfigStateStore(t)

entries := []structs.ConfigEntry{
&structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "main",
Protocol: "http",
},
&structs.ServiceResolverConfigEntry{
Kind: structs.ServiceResolver,
Name: "main",
Failover: map[string]structs.ServiceResolverFailover{
"*": {
Targets: []structs.ServiceResolverFailoverTarget{
{Peer: "cluster-01"},
{Peer: "cluster-02"}, // Non-existant
},
},
},
},
}

for _, entry := range entries {
require.NoError(t, s.EnsureConfigEntry(0, entry))
}

cluster01Peering := &pbpeering.Peering{
ID: testFooPeerID,
Name: "cluster-01",
}
err := s.PeeringWrite(0, &pbpeering.PeeringWriteRequest{Peering: cluster01Peering})
require.NoError(t, err)

_, entrySet, err := s.readDiscoveryChainConfigEntries(nil, "main", nil, nil)
require.NoError(t, err)

require.Len(t, entrySet.Routers, 0)
require.Len(t, entrySet.Splitters, 0)
require.Len(t, entrySet.Resolvers, 1)
require.Len(t, entrySet.Services, 1)
prototest.AssertDeepEqual(t, entrySet.Peers, map[string]*pbpeering.Peering{
"cluster-01": cluster01Peering,
"cluster-02": nil,
})
}

// TODO(rb): add ServiceIntentions tests

func TestStore_ValidateGatewayNamesCannotBeShared(t *testing.T) {
Expand Down
21 changes: 21 additions & 0 deletions agent/structs/config_entry_discoverychain.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/lib/maps"
)

const (
Expand Down Expand Up @@ -871,6 +872,26 @@ type ServiceResolverConfigEntry struct {
RaftIndex
}

func (e *ServiceResolverConfigEntry) RelatedPeers() []string {
peers := make(map[string]struct{})

if r := e.Redirect; r != nil && r.Peer != "" {
peers[r.Peer] = struct{}{}
}

if e.Failover != nil {
for _, f := range e.Failover {
for _, t := range f.Targets {
if t.Peer != "" {
peers[t.Peer] = struct{}{}
}
}
}
}

return maps.SliceOfKeys(peers)
}

func (e *ServiceResolverConfigEntry) MarshalJSON() ([]byte, error) {
type Alias ServiceResolverConfigEntry
exported := &struct {
Expand Down
13 changes: 7 additions & 6 deletions agent/structs/discovery_chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,12 +190,13 @@ type DiscoveryTarget struct {
// chain. It should be treated as a per-compile opaque string.
ID string `json:",omitempty"`

Service string `json:",omitempty"`
ServiceSubset string `json:",omitempty"`
Namespace string `json:",omitempty"`
Partition string `json:",omitempty"`
Datacenter string `json:",omitempty"`
Peer string `json:",omitempty"`
Service string `json:",omitempty"`
ServiceSubset string `json:",omitempty"`
Namespace string `json:",omitempty"`
Partition string `json:",omitempty"`
Datacenter string `json:",omitempty"`
Peer string `json:",omitempty"`
Locality *Locality `json:",omitempty"`

MeshGateway MeshGatewayConfig `json:",omitempty"`
Subset ServiceResolverSubset `json:",omitempty"`
Expand Down
4 changes: 4 additions & 0 deletions agent/structs/structs.deepcopy.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ func (o *CompiledDiscoveryChain) DeepCopy() *CompiledDiscoveryChain {
if v2 != nil {
cp_Targets_v2 = new(DiscoveryTarget)
*cp_Targets_v2 = *v2
if v2.Locality != nil {
cp_Targets_v2.Locality = new(Locality)
*cp_Targets_v2.Locality = *v2.Locality
}
}
cp.Targets[k2] = cp_Targets_v2
}
Expand Down

0 comments on commit e298f50

Please sign in to comment.