Skip to content

Commit

Permalink
Add remote peer partition and datacenter info.
Browse files Browse the repository at this point in the history
  • Loading branch information
hashi-derek committed Oct 7, 2022
1 parent 4abad02 commit 8502a22
Show file tree
Hide file tree
Showing 17 changed files with 966 additions and 570 deletions.
3 changes: 3 additions & 0 deletions .changelog/14889.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:enhancement
peering: Add peering datacenter and parition to initial handshake.
```
4 changes: 4 additions & 0 deletions agent/consul/leader_peering.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,10 @@ func (s *Server) establishStream(ctx context.Context, logger hclog.Logger, ws me
Open: &pbpeerstream.ReplicationMessage_Open{
PeerID: peer.PeerID,
StreamSecretID: secret.GetStream().GetActiveSecretID(),
Remote: &pbpeering.RemoteInfo{
Partition: peer.Partition,
Datacenter: s.config.Datacenter,
},
},
},
}
Expand Down
102 changes: 102 additions & 0 deletions agent/consul/leader_peering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,108 @@ func TestLeader_Peering_DeferredDeletion(t *testing.T) {
})
}

func TestLeader_Peering_RemoteInfo(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}

ca := connect.TestCA(t, nil)
_, acceptingServer := testServerWithConfig(t, func(c *Config) {
c.NodeName = "accepting-server"
c.Datacenter = "dc1"
c.TLSConfig.Domain = "consul"
c.PeeringEnabled = true
c.GRPCTLSPort = freeport.GetOne(t)
c.CAConfig = &structs.CAConfiguration{
ClusterID: connect.TestClusterID,
Provider: structs.ConsulCAProvider,
Config: map[string]interface{}{
"PrivateKey": ca.SigningKey,
"RootCert": ca.RootCert,
},
}
})
testrpc.WaitForLeader(t, acceptingServer.RPC, "dc1")

// Create a peering by generating a token.
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
t.Cleanup(cancel)

conn, err := grpc.DialContext(ctx, acceptingServer.config.RPCAddr.String(),
grpc.WithContextDialer(newServerDialer(acceptingServer.config.RPCAddr.String())),
grpc.WithInsecure(),
grpc.WithBlock())
require.NoError(t, err)
defer conn.Close()

acceptingClient := pbpeering.NewPeeringServiceClient(conn)
req := pbpeering.GenerateTokenRequest{
PeerName: "my-peer-dialing-server",
}
resp, err := acceptingClient.GenerateToken(ctx, &req)
require.NoError(t, err)
tokenJSON, err := base64.StdEncoding.DecodeString(resp.PeeringToken)
require.NoError(t, err)
var token structs.PeeringToken
require.NoError(t, json.Unmarshal(tokenJSON, &token))
require.Equal(t, "dc1", token.Remote.Datacenter)
require.Contains(t, []string{"", "default"}, token.Remote.Partition)

// Bring up dialingServer and store acceptingServer's token so that it attempts to dial.
_, dialingServer := testServerWithConfig(t, func(c *Config) {
c.NodeName = "dialing-server"
c.Datacenter = "dc2"
c.PrimaryDatacenter = "dc2"
c.PeeringEnabled = true
})
testrpc.WaitForLeader(t, dialingServer.RPC, "dc2")

// Create a peering at s2 by establishing a peering with s1's token
ctx, cancel = context.WithTimeout(context.Background(), 3*time.Second)
t.Cleanup(cancel)

conn, err = grpc.DialContext(ctx, dialingServer.config.RPCAddr.String(),
grpc.WithContextDialer(newServerDialer(dialingServer.config.RPCAddr.String())),
grpc.WithInsecure(),
grpc.WithBlock())
require.NoError(t, err)
defer conn.Close()

dialingClient := pbpeering.NewPeeringServiceClient(conn)

establishReq := pbpeering.EstablishRequest{
PeerName: "my-peer-s1",
PeeringToken: resp.PeeringToken,
}
_, err = dialingClient.Establish(ctx, &establishReq)
require.NoError(t, err)

// Ensure that the dialer's remote info contains the acceptor's dc.
ctx, cancel = context.WithTimeout(context.Background(), 3*time.Second)
t.Cleanup(cancel)
p, err := dialingClient.PeeringRead(ctx, &pbpeering.PeeringReadRequest{Name: "my-peer-s1"})
require.NoError(t, err)
require.Equal(t, "dc1", p.Peering.Remote.Datacenter)
require.Contains(t, []string{"", "default"}, p.Peering.Remote.Partition)

// Retry fetching the until the peering is active in the acceptor.
ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second)
t.Cleanup(cancel)
p = nil
for p == nil && ctx.Err() == nil {
p, err = acceptingClient.PeeringRead(ctx, &pbpeering.PeeringReadRequest{Name: "my-peer-dialing-server"})
require.NoError(t, err)
if p.Peering.State != pbpeering.PeeringState_ACTIVE {
p = nil
}
time.Sleep(50 * time.Millisecond)
}
// Ensure that the acceptor's remote info contains the dialer's dc.
require.NotNil(t, p)
require.Equal(t, "dc2", p.Peering.Remote.Datacenter)
require.Contains(t, []string{"", "default"}, p.Peering.Remote.Partition)
}

// Test that the dialing peer attempts to reestablish connections when the accepting peer
// shuts down without sending a Terminated message.
//
Expand Down
9 changes: 9 additions & 0 deletions agent/consul/state/peering.go
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,15 @@ func (s *Store) PeeringWrite(idx uint64, req *pbpeering.PeeringWriteRequest) err
if req.Peering.State == pbpeering.PeeringState_UNDEFINED {
req.Peering.State = existing.State
}

// Prevent RemoteInfo from being overwritten with empty data
if !existing.Remote.IsEmpty() && req.Peering.Remote.IsEmpty() {
req.Peering.Remote = &pbpeering.RemoteInfo{
Partition: existing.Remote.Partition,
Datacenter: existing.Remote.Datacenter,
}
}

// TODO(peering): Confirm behavior when /peering/token is called more than once.
// We may need to avoid clobbering existing values.
req.Peering.ImportedServiceCount = existing.ImportedServiceCount
Expand Down
62 changes: 62 additions & 0 deletions agent/consul/state/peering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1145,6 +1145,7 @@ func TestStore_PeeringWrite(t *testing.T) {
require.Equal(t, tc.expect.peering.State, p.State)
require.Equal(t, tc.expect.peering.Name, p.Name)
require.Equal(t, tc.expect.peering.Meta, p.Meta)
require.Equal(t, tc.expect.peering.Remote, p.Remote)
if tc.expect.peering.DeletedAt != nil {
require.Equal(t, tc.expect.peering.DeletedAt, p.DeletedAt)
}
Expand Down Expand Up @@ -1227,13 +1228,21 @@ func TestStore_PeeringWrite(t *testing.T) {
State: pbpeering.PeeringState_FAILING,
PeerServerAddresses: []string{"localhost:8502"},
Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(),
Remote: &pbpeering.RemoteInfo{
Partition: "part1",
Datacenter: "datacenter1",
},
},
},
expect: expectations{
peering: &pbpeering.Peering{
ID: testBazPeerID,
Name: "baz",
State: pbpeering.PeeringState_FAILING,
Remote: &pbpeering.RemoteInfo{
Partition: "part1",
Datacenter: "datacenter1",
},
},
secrets: &pbpeering.PeeringSecrets{
PeerID: testBazPeerID,
Expand Down Expand Up @@ -1261,6 +1270,39 @@ func TestStore_PeeringWrite(t *testing.T) {
Name: "baz",
// Previous failing state is picked up.
State: pbpeering.PeeringState_FAILING,
Remote: &pbpeering.RemoteInfo{
Partition: "part1",
Datacenter: "datacenter1",
},
},
secrets: &pbpeering.PeeringSecrets{
PeerID: testBazPeerID,
Stream: &pbpeering.PeeringSecrets_Stream{
ActiveSecretID: testBazSecretID,
},
},
},
},
{
name: "if no remote info was included in request it is inherited from existing",
input: &pbpeering.PeeringWriteRequest{
Peering: &pbpeering.Peering{
ID: testBazPeerID,
Name: "baz",
State: pbpeering.PeeringState_ACTIVE,
PeerServerAddresses: []string{"localhost:8502"},
Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(),
},
},
expect: expectations{
peering: &pbpeering.Peering{
ID: testBazPeerID,
Name: "baz",
State: pbpeering.PeeringState_ACTIVE,
Remote: &pbpeering.RemoteInfo{
Partition: "part1",
Datacenter: "datacenter1",
},
},
secrets: &pbpeering.PeeringSecrets{
PeerID: testBazPeerID,
Expand All @@ -1286,6 +1328,10 @@ func TestStore_PeeringWrite(t *testing.T) {
ID: testBazPeerID,
Name: "baz",
State: pbpeering.PeeringState_TERMINATED,
Remote: &pbpeering.RemoteInfo{
Partition: "part1",
Datacenter: "datacenter1",
},
},
// Secrets for baz should have been deleted
secrets: nil,
Expand All @@ -1310,6 +1356,10 @@ func TestStore_PeeringWrite(t *testing.T) {
ID: testBazPeerID,
Name: "baz",
State: pbpeering.PeeringState_TERMINATED,
Remote: &pbpeering.RemoteInfo{
Partition: "part1",
Datacenter: "datacenter1",
},
// Meta should be unchanged.
Meta: nil,
},
Expand All @@ -1333,6 +1383,10 @@ func TestStore_PeeringWrite(t *testing.T) {
Name: "baz",
State: pbpeering.PeeringState_DELETING,
DeletedAt: structs.TimeToProto(testTime),
Remote: &pbpeering.RemoteInfo{
Partition: "part1",
Datacenter: "datacenter1",
},
},
secrets: nil,
},
Expand All @@ -1356,6 +1410,10 @@ func TestStore_PeeringWrite(t *testing.T) {
// Still marked as deleting at the original testTime
State: pbpeering.PeeringState_DELETING,
DeletedAt: structs.TimeToProto(testTime),
Remote: &pbpeering.RemoteInfo{
Partition: "part1",
Datacenter: "datacenter1",
},
},
// Secrets for baz should have been deleted
secrets: nil,
Expand All @@ -1378,6 +1436,10 @@ func TestStore_PeeringWrite(t *testing.T) {
Name: "baz",
// Still marked as deleting
State: pbpeering.PeeringState_DELETING,
Remote: &pbpeering.RemoteInfo{
Partition: "part1",
Datacenter: "datacenter1",
},
},
// Secrets for baz should have been deleted
secrets: nil,
Expand Down
18 changes: 15 additions & 3 deletions agent/grpc-external/services/peerstream/stream_resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,14 +153,21 @@ func (s *Server) StreamResources(stream pbpeerstream.PeerStreamService_StreamRes
return grpcstatus.Error(codes.InvalidArgument, "initial subscription request must specify a PeerID")
}

_, p, err := s.GetStore().PeeringReadByID(nil, req.PeerID)
var p *pbpeering.Peering
_, p, err = s.GetStore().PeeringReadByID(nil, req.PeerID)
if err != nil {
logger.Error("failed to look up peer", "peer_id", req.PeerID, "error", err)
return grpcstatus.Error(codes.Internal, "failed to find PeerID: "+req.PeerID)
}
if p == nil {
return grpcstatus.Error(codes.InvalidArgument, "initial subscription for unknown PeerID: "+req.PeerID)
}
// Clone the peering because we will modify and rewrite it.
p, ok := proto.Clone(p).(*pbpeering.Peering)
if !ok {
return grpcstatus.Errorf(codes.Internal, "unexpected error while cloning a Peering object: %v", err)
}

if !p.IsActive() {
// If peering is terminated, then our peer sent the termination message.
// For other non-active states, send the termination message.
Expand Down Expand Up @@ -215,9 +222,14 @@ func (s *Server) StreamResources(stream pbpeerstream.PeerStreamService_StreamRes
},
},
}
err = s.Backend.PeeringSecretsWrite(promoted)

p.Remote = req.Remote
err = s.Backend.PeeringWrite(&pbpeering.PeeringWriteRequest{
Peering: p,
SecretsRequest: promoted,
})
if err != nil {
return grpcstatus.Errorf(codes.Internal, "failed to persist peering secret: %v", err)
return grpcstatus.Errorf(codes.Internal, "failed to persist peering: %v", err)
}
}
if !authorized {
Expand Down
9 changes: 8 additions & 1 deletion agent/rpc/peering/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,10 @@ func (s *Server) GenerateToken(
ServerAddresses: serverAddrs,
ServerName: serverName,
EstablishmentSecret: secretID,
Remote: structs.PeeringTokenRemote{
Partition: req.PartitionOrDefault(),
Datacenter: s.Datacenter,
},
}

encoded, err := s.Backend.EncodeToken(&tok)
Expand Down Expand Up @@ -416,9 +420,12 @@ func (s *Server) Establish(
PeerID: tok.PeerID,
Meta: req.Meta,
State: pbpeering.PeeringState_ESTABLISHING,

// PartitionOrEmpty is used to avoid writing "default" in OSS.
Partition: entMeta.PartitionOrEmpty(),
Remote: &pbpeering.RemoteInfo{
Partition: tok.Remote.Partition,
Datacenter: tok.Remote.Datacenter,
},
}

tlsOption, err := peering.TLSDialOption()
Expand Down
11 changes: 10 additions & 1 deletion agent/rpc/peering/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,10 @@ func TestPeeringService_GenerateToken(t *testing.T) {
secret string
)
testutil.RunStep(t, "peering token is generated with data", func(t *testing.T) {
req := pbpeering.GenerateTokenRequest{PeerName: "peerB", Meta: map[string]string{"foo": "bar"}}
req := pbpeering.GenerateTokenRequest{
PeerName: "peerB",
Meta: map[string]string{"foo": "bar"},
}
resp, err := client.GenerateToken(ctx, &req)
require.NoError(t, err)

Expand All @@ -108,6 +111,7 @@ func TestPeeringService_GenerateToken(t *testing.T) {
_, roots, err := s.Server.FSM().State().CARoots(nil)
require.NoError(t, err)
require.Equal(t, []string{roots.Active().RootCert}, token.CA)
require.Equal(t, "dc1", token.Remote.Datacenter)

require.NotEmpty(t, token.EstablishmentSecret)
secret = token.EstablishmentSecret
Expand Down Expand Up @@ -427,6 +431,8 @@ func TestPeeringService_Establish(t *testing.T) {
// TODO(peering): see note on newTestServer, refactor to not use this
s1 := newTestServer(t, func(conf *consul.Config) {
conf.NodeName = "s1"
conf.Datacenter = "test-dc1"
conf.PrimaryDatacenter = "test-dc1"
})
client1 := pbpeering.NewPeeringServiceClient(s1.ClientConn(t))

Expand Down Expand Up @@ -474,6 +480,9 @@ func TestPeeringService_Establish(t *testing.T) {
require.Equal(t, token.CA, resp.Peering.PeerCAPems)
require.Equal(t, token.ServerAddresses, resp.Peering.PeerServerAddresses)
require.Equal(t, token.ServerName, resp.Peering.PeerServerName)
require.Equal(t, "test-dc1", token.Remote.Datacenter)
require.Equal(t, "test-dc1", resp.Peering.Remote.Datacenter)
require.Equal(t, token.Remote.Partition, resp.Peering.Remote.Partition)
})

testutil.RunStep(t, "stream secret is persisted", func(t *testing.T) {
Expand Down
6 changes: 6 additions & 0 deletions agent/structs/peering.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ type PeeringToken struct {
ServerName string
PeerID string
EstablishmentSecret string
Remote PeeringTokenRemote
}

type PeeringTokenRemote struct {
Partition string
Datacenter string
}

type IndexedExportedServiceList struct {
Expand Down
Loading

0 comments on commit 8502a22

Please sign in to comment.