Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Share peer datacenter and partition info #14889

Merged
merged 1 commit into from
Oct 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/14889.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:enhancement
peering: Add peering datacenter and partition to initial handshake.
```
4 changes: 4 additions & 0 deletions agent/consul/leader_peering.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,10 @@ func (s *Server) establishStream(ctx context.Context, logger hclog.Logger, ws me
Open: &pbpeerstream.ReplicationMessage_Open{
PeerID: peer.PeerID,
StreamSecretID: secret.GetStream().GetActiveSecretID(),
Remote: &pbpeering.RemoteInfo{
Partition: peer.Partition,
Datacenter: s.config.Datacenter,
},
},
},
}
Expand Down
102 changes: 102 additions & 0 deletions agent/consul/leader_peering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,108 @@ func TestLeader_Peering_DeferredDeletion(t *testing.T) {
})
}

func TestLeader_Peering_RemoteInfo(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}

ca := connect.TestCA(t, nil)
_, acceptingServer := testServerWithConfig(t, func(c *Config) {
c.NodeName = "accepting-server"
c.Datacenter = "dc1"
c.TLSConfig.Domain = "consul"
c.PeeringEnabled = true
c.GRPCTLSPort = freeport.GetOne(t)
c.CAConfig = &structs.CAConfiguration{
ClusterID: connect.TestClusterID,
Provider: structs.ConsulCAProvider,
Config: map[string]interface{}{
"PrivateKey": ca.SigningKey,
"RootCert": ca.RootCert,
},
}
})
testrpc.WaitForLeader(t, acceptingServer.RPC, "dc1")

// Create a peering by generating a token.
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
t.Cleanup(cancel)

conn, err := grpc.DialContext(ctx, acceptingServer.config.RPCAddr.String(),
grpc.WithContextDialer(newServerDialer(acceptingServer.config.RPCAddr.String())),
grpc.WithInsecure(),
grpc.WithBlock())
require.NoError(t, err)
defer conn.Close()

acceptingClient := pbpeering.NewPeeringServiceClient(conn)
req := pbpeering.GenerateTokenRequest{
PeerName: "my-peer-dialing-server",
}
resp, err := acceptingClient.GenerateToken(ctx, &req)
require.NoError(t, err)
tokenJSON, err := base64.StdEncoding.DecodeString(resp.PeeringToken)
require.NoError(t, err)
var token structs.PeeringToken
require.NoError(t, json.Unmarshal(tokenJSON, &token))

// Ensure that the token contains the correct partition and dc
require.Equal(t, "dc1", token.Remote.Datacenter)
require.Contains(t, []string{"", "default"}, token.Remote.Partition)

// Bring up dialingServer and store acceptingServer's token so that it attempts to dial.
_, dialingServer := testServerWithConfig(t, func(c *Config) {
c.NodeName = "dialing-server"
c.Datacenter = "dc2"
c.PrimaryDatacenter = "dc2"
c.PeeringEnabled = true
})
testrpc.WaitForLeader(t, dialingServer.RPC, "dc2")

// Create a peering at s2 by establishing a peering with s1's token
ctx, cancel = context.WithTimeout(context.Background(), 3*time.Second)
t.Cleanup(cancel)

conn, err = grpc.DialContext(ctx, dialingServer.config.RPCAddr.String(),
grpc.WithContextDialer(newServerDialer(dialingServer.config.RPCAddr.String())),
grpc.WithInsecure(),
grpc.WithBlock())
require.NoError(t, err)
defer conn.Close()

dialingClient := pbpeering.NewPeeringServiceClient(conn)

establishReq := pbpeering.EstablishRequest{
PeerName: "my-peer-s1",
PeeringToken: resp.PeeringToken,
}
_, err = dialingClient.Establish(ctx, &establishReq)
require.NoError(t, err)

// Ensure that the dialer's remote info contains the acceptor's dc.
ctx, cancel = context.WithTimeout(context.Background(), 3*time.Second)
t.Cleanup(cancel)
p, err := dialingClient.PeeringRead(ctx, &pbpeering.PeeringReadRequest{Name: "my-peer-s1"})
require.NoError(t, err)
require.Equal(t, "dc1", p.Peering.Remote.Datacenter)
require.Contains(t, []string{"", "default"}, p.Peering.Remote.Partition)

// Retry fetching the until the peering is active in the acceptor.
ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second)
t.Cleanup(cancel)
p = nil
retry.Run(t, func(r *retry.R) {
p, err = acceptingClient.PeeringRead(ctx, &pbpeering.PeeringReadRequest{Name: "my-peer-dialing-server"})
require.NoError(r, err)
require.Equal(r, pbpeering.PeeringState_ACTIVE, p.Peering.State)
})

// Ensure that the acceptor's remote info contains the dialer's dc.
require.NotNil(t, p)
require.Equal(t, "dc2", p.Peering.Remote.Datacenter)
require.Contains(t, []string{"", "default"}, p.Peering.Remote.Partition)
}

// Test that the dialing peer attempts to reestablish connections when the accepting peer
// shuts down without sending a Terminated message.
//
Expand Down
9 changes: 9 additions & 0 deletions agent/consul/state/peering.go
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,15 @@ func (s *Store) PeeringWrite(idx uint64, req *pbpeering.PeeringWriteRequest) err
if req.Peering.State == pbpeering.PeeringState_UNDEFINED {
req.Peering.State = existing.State
}

// Prevent RemoteInfo from being overwritten with empty data
if !existing.Remote.IsEmpty() && req.Peering.Remote.IsEmpty() {
req.Peering.Remote = &pbpeering.RemoteInfo{
Partition: existing.Remote.Partition,
Datacenter: existing.Remote.Datacenter,
}
}

req.Peering.StreamStatus = nil
req.Peering.CreateIndex = existing.CreateIndex
req.Peering.ModifyIndex = idx
Expand Down
62 changes: 62 additions & 0 deletions agent/consul/state/peering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1145,6 +1145,7 @@ func TestStore_PeeringWrite(t *testing.T) {
require.Equal(t, tc.expect.peering.State, p.State)
require.Equal(t, tc.expect.peering.Name, p.Name)
require.Equal(t, tc.expect.peering.Meta, p.Meta)
require.Equal(t, tc.expect.peering.Remote, p.Remote)
if tc.expect.peering.DeletedAt != nil {
require.Equal(t, tc.expect.peering.DeletedAt, p.DeletedAt)
}
Expand Down Expand Up @@ -1227,13 +1228,21 @@ func TestStore_PeeringWrite(t *testing.T) {
State: pbpeering.PeeringState_FAILING,
PeerServerAddresses: []string{"localhost:8502"},
Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(),
Remote: &pbpeering.RemoteInfo{
Partition: "part1",
Datacenter: "datacenter1",
},
},
},
expect: expectations{
peering: &pbpeering.Peering{
ID: testBazPeerID,
Name: "baz",
State: pbpeering.PeeringState_FAILING,
Remote: &pbpeering.RemoteInfo{
Partition: "part1",
Datacenter: "datacenter1",
},
},
secrets: &pbpeering.PeeringSecrets{
PeerID: testBazPeerID,
Expand Down Expand Up @@ -1261,6 +1270,39 @@ func TestStore_PeeringWrite(t *testing.T) {
Name: "baz",
// Previous failing state is picked up.
State: pbpeering.PeeringState_FAILING,
Remote: &pbpeering.RemoteInfo{
Partition: "part1",
Datacenter: "datacenter1",
},
},
secrets: &pbpeering.PeeringSecrets{
PeerID: testBazPeerID,
Stream: &pbpeering.PeeringSecrets_Stream{
ActiveSecretID: testBazSecretID,
},
},
},
},
{
name: "if no remote info was included in request it is inherited from existing",
input: &pbpeering.PeeringWriteRequest{
Peering: &pbpeering.Peering{
ID: testBazPeerID,
Name: "baz",
State: pbpeering.PeeringState_ACTIVE,
PeerServerAddresses: []string{"localhost:8502"},
Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(),
},
},
expect: expectations{
peering: &pbpeering.Peering{
ID: testBazPeerID,
Name: "baz",
State: pbpeering.PeeringState_ACTIVE,
Remote: &pbpeering.RemoteInfo{
Partition: "part1",
Datacenter: "datacenter1",
},
},
secrets: &pbpeering.PeeringSecrets{
PeerID: testBazPeerID,
Expand All @@ -1286,6 +1328,10 @@ func TestStore_PeeringWrite(t *testing.T) {
ID: testBazPeerID,
Name: "baz",
State: pbpeering.PeeringState_TERMINATED,
Remote: &pbpeering.RemoteInfo{
Partition: "part1",
Datacenter: "datacenter1",
},
},
// Secrets for baz should have been deleted
secrets: nil,
Expand All @@ -1310,6 +1356,10 @@ func TestStore_PeeringWrite(t *testing.T) {
ID: testBazPeerID,
Name: "baz",
State: pbpeering.PeeringState_TERMINATED,
Remote: &pbpeering.RemoteInfo{
Partition: "part1",
Datacenter: "datacenter1",
},
// Meta should be unchanged.
Meta: nil,
},
Expand All @@ -1333,6 +1383,10 @@ func TestStore_PeeringWrite(t *testing.T) {
Name: "baz",
State: pbpeering.PeeringState_DELETING,
DeletedAt: structs.TimeToProto(testTime),
Remote: &pbpeering.RemoteInfo{
Partition: "part1",
Datacenter: "datacenter1",
},
},
secrets: nil,
},
Expand All @@ -1356,6 +1410,10 @@ func TestStore_PeeringWrite(t *testing.T) {
// Still marked as deleting at the original testTime
State: pbpeering.PeeringState_DELETING,
DeletedAt: structs.TimeToProto(testTime),
Remote: &pbpeering.RemoteInfo{
Partition: "part1",
Datacenter: "datacenter1",
},
},
// Secrets for baz should have been deleted
secrets: nil,
Expand All @@ -1378,6 +1436,10 @@ func TestStore_PeeringWrite(t *testing.T) {
Name: "baz",
// Still marked as deleting
State: pbpeering.PeeringState_DELETING,
Remote: &pbpeering.RemoteInfo{
Partition: "part1",
Datacenter: "datacenter1",
},
},
// Secrets for baz should have been deleted
secrets: nil,
Expand Down
18 changes: 15 additions & 3 deletions agent/grpc-external/services/peerstream/stream_resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,14 +153,21 @@ func (s *Server) StreamResources(stream pbpeerstream.PeerStreamService_StreamRes
return grpcstatus.Error(codes.InvalidArgument, "initial subscription request must specify a PeerID")
}

_, p, err := s.GetStore().PeeringReadByID(nil, req.PeerID)
var p *pbpeering.Peering
_, p, err = s.GetStore().PeeringReadByID(nil, req.PeerID)
if err != nil {
logger.Error("failed to look up peer", "peer_id", req.PeerID, "error", err)
return grpcstatus.Error(codes.Internal, "failed to find PeerID: "+req.PeerID)
}
if p == nil {
return grpcstatus.Error(codes.InvalidArgument, "initial subscription for unknown PeerID: "+req.PeerID)
}
// Clone the peering because we will modify and rewrite it.
p, ok := proto.Clone(p).(*pbpeering.Peering)
if !ok {
return grpcstatus.Errorf(codes.Internal, "unexpected error while cloning a Peering object.")
}

if !p.IsActive() {
// If peering is terminated, then our peer sent the termination message.
// For other non-active states, send the termination message.
Expand Down Expand Up @@ -215,9 +222,14 @@ func (s *Server) StreamResources(stream pbpeerstream.PeerStreamService_StreamRes
},
},
}
err = s.Backend.PeeringSecretsWrite(promoted)

p.Remote = req.Remote
err = s.Backend.PeeringWrite(&pbpeering.PeeringWriteRequest{
Peering: p,
SecretsRequest: promoted,
})
if err != nil {
return grpcstatus.Errorf(codes.Internal, "failed to persist peering secret: %v", err)
return grpcstatus.Errorf(codes.Internal, "failed to persist peering: %v", err)
}
}
if !authorized {
Expand Down
9 changes: 8 additions & 1 deletion agent/rpc/peering/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,10 @@ func (s *Server) GenerateToken(
ServerAddresses: serverAddrs,
ServerName: serverName,
EstablishmentSecret: secretID,
Remote: structs.PeeringTokenRemote{
Partition: req.PartitionOrDefault(),
Datacenter: s.Datacenter,
},
}

encoded, err := s.Backend.EncodeToken(&tok)
Expand Down Expand Up @@ -416,9 +420,12 @@ func (s *Server) Establish(
PeerID: tok.PeerID,
Meta: req.Meta,
State: pbpeering.PeeringState_ESTABLISHING,

// PartitionOrEmpty is used to avoid writing "default" in OSS.
Partition: entMeta.PartitionOrEmpty(),
Remote: &pbpeering.RemoteInfo{
Partition: tok.Remote.Partition,
Datacenter: tok.Remote.Datacenter,
},
}

tlsOption, err := peering.TLSDialOption()
Expand Down
11 changes: 10 additions & 1 deletion agent/rpc/peering/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,10 @@ func TestPeeringService_GenerateToken(t *testing.T) {
secret string
)
testutil.RunStep(t, "peering token is generated with data", func(t *testing.T) {
req := pbpeering.GenerateTokenRequest{PeerName: "peerB", Meta: map[string]string{"foo": "bar"}}
req := pbpeering.GenerateTokenRequest{
PeerName: "peerB",
Meta: map[string]string{"foo": "bar"},
}
resp, err := client.GenerateToken(ctx, &req)
require.NoError(t, err)

Expand All @@ -108,6 +111,7 @@ func TestPeeringService_GenerateToken(t *testing.T) {
_, roots, err := s.Server.FSM().State().CARoots(nil)
require.NoError(t, err)
require.Equal(t, []string{roots.Active().RootCert}, token.CA)
require.Equal(t, "dc1", token.Remote.Datacenter)

require.NotEmpty(t, token.EstablishmentSecret)
secret = token.EstablishmentSecret
Expand Down Expand Up @@ -427,6 +431,8 @@ func TestPeeringService_Establish(t *testing.T) {
// TODO(peering): see note on newTestServer, refactor to not use this
s1 := newTestServer(t, func(conf *consul.Config) {
conf.NodeName = "s1"
conf.Datacenter = "test-dc1"
conf.PrimaryDatacenter = "test-dc1"
})
client1 := pbpeering.NewPeeringServiceClient(s1.ClientConn(t))

Expand Down Expand Up @@ -474,6 +480,9 @@ func TestPeeringService_Establish(t *testing.T) {
require.Equal(t, token.CA, resp.Peering.PeerCAPems)
require.Equal(t, token.ServerAddresses, resp.Peering.PeerServerAddresses)
require.Equal(t, token.ServerName, resp.Peering.PeerServerName)
require.Equal(t, "test-dc1", token.Remote.Datacenter)
require.Equal(t, "test-dc1", resp.Peering.Remote.Datacenter)
require.Equal(t, token.Remote.Partition, resp.Peering.Remote.Partition)
})

testutil.RunStep(t, "stream secret is persisted", func(t *testing.T) {
Expand Down
6 changes: 6 additions & 0 deletions agent/structs/peering.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ type PeeringToken struct {
ServerName string
PeerID string
EstablishmentSecret string
Remote PeeringTokenRemote
}

type PeeringTokenRemote struct {
Partition string
Datacenter string
}

type IndexedExportedServiceList struct {
Expand Down
Loading