Skip to content

Commit

Permalink
Implement In-Process gRPC for use by controller caching/indexing
Browse files Browse the repository at this point in the history
This replaces the pipe base listener implementation we were previously using. The new style CAN avoid cloning resources which our controller caching/indexing is taking advantage of to not duplicate resource objects in memory.

To maintain safety for controllers and for them to be able to modify data they get back from the cache and the resource service, the client they are presented in their runtime will be wrapped with an autogenerated client which clones request and response messages as they pass through the client.

Another sizable change in this PR is to consolidate how server specific gRPC services get registered and managed. Before this was in a bunch of different methods and it was difficult to track down how gRPC services were registered. Now its all in one place.
  • Loading branch information
mkeeler committed Jan 10, 2024
1 parent 25b37d7 commit ac4ad5f
Show file tree
Hide file tree
Showing 70 changed files with 3,615 additions and 431 deletions.
3 changes: 2 additions & 1 deletion .grpcmocks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
# SPDX-License-Identifier: BUSL-1.1

with-expecter: true
all: true
recursive: true
include-regex: ".*"
exclude-regex: "(serverStream|Is(Inmem|Cloning).*Client)"
# We don't want the mocks within proto-public to prevent forcing a dependency
# of the testify library on the modules usage. The mocks are only for
# internal testing purposes. Other consumers can generated the mocks into
Expand Down
4 changes: 2 additions & 2 deletions agent/consul/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,13 +350,13 @@ func (s *Server) establishLeadership(ctx context.Context) error {
}

if s.useV2Tenancy {
if err := s.initTenancy(ctx, s.resourceServiceServer.Backend); err != nil {
if err := s.initTenancy(ctx, s.storageBackend); err != nil {
return err
}
}

if s.useV2Resources {
if err := s.initConsulService(ctx, s.insecureResourceServiceClient); err != nil {
if err := s.initConsulService(ctx, pbresource.NewResourceServiceClient(s.insecureSafeGRPCChan)); err != nil {
return err
}
}
Expand Down
6 changes: 3 additions & 3 deletions agent/consul/leader_ce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,16 @@ func TestServer_InitTenancy(t *testing.T) {
Name: resource.DefaultNamespaceName,
}

ns, err := s.resourceServiceServer.Backend.Read(context.Background(), storage.StrongConsistency, nsID)
ns, err := s.storageBackend.Read(context.Background(), storage.StrongConsistency, nsID)
require.NoError(t, err)
require.Equal(t, resource.DefaultNamespaceName, ns.Id.Name)

// explicitly call initiTenancy to verify we do not re-create namespace
err = s.initTenancy(context.Background(), s.resourceServiceServer.Backend)
err = s.initTenancy(context.Background(), s.storageBackend)
require.NoError(t, err)

// read again
actual, err := s.resourceServiceServer.Backend.Read(context.Background(), storage.StrongConsistency, nsID)
actual, err := s.storageBackend.Read(context.Background(), storage.StrongConsistency, nsID)
require.NoError(t, err)

require.Equal(t, ns.Id.Uid, actual.Id.Uid)
Expand Down
4 changes: 2 additions & 2 deletions agent/consul/leader_peering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1426,7 +1426,7 @@ func TestLeader_PeeringMetrics_emitPeeringMetrics(t *testing.T) {
require.NoError(t, s2.fsm.State().PeeringWrite(lastIdx, &pbpeering.PeeringWriteRequest{Peering: p2}))

// connect the stream
mst1, err := s2.peeringServer.Tracker.Connected(s2PeerID1)
mst1, err := s2.peerStreamServer.Tracker.Connected(s2PeerID1)
require.NoError(t, err)

// mimic tracking exported services
Expand All @@ -1437,7 +1437,7 @@ func TestLeader_PeeringMetrics_emitPeeringMetrics(t *testing.T) {
})

// connect the stream
mst2, err := s2.peeringServer.Tracker.Connected(s2PeerID2)
mst2, err := s2.peerStreamServer.Tracker.Connected(s2PeerID2)
require.NoError(t, err)

// mimic tracking exported services
Expand Down
2 changes: 1 addition & 1 deletion agent/consul/leader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func Test_InitConsulService(t *testing.T) {

testrpc.WaitForRaftLeader(t, s.RPC, "dc1", testrpc.WithToken("root"))

client := s.insecureResourceServiceClient
client := pbresource.NewResourceServiceClient(s.insecureSafeGRPCChan)

consulServiceID := &pbresource.ID{
Name: structs.ConsulServiceName,
Expand Down
4 changes: 2 additions & 2 deletions agent/consul/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ func (s *Server) handleConn(conn net.Conn, isTLS bool) {
s.handleInsecureConn(conn)

case pool.RPCGRPC:
s.grpcHandler.Handle(conn)
s.internalGRPCHandler.Handle(conn)

case pool.RPCRaftForwarding:
s.handleRaftForwarding(conn)
Expand Down Expand Up @@ -315,7 +315,7 @@ func (s *Server) handleNativeTLS(conn net.Conn) {
s.handleSnapshotConn(tlsConn)

case pool.ALPN_RPCGRPC:
s.grpcHandler.Handle(tlsConn)
s.internalGRPCHandler.Handle(tlsConn)

case pool.ALPN_RPCRaftForwarding:
s.handleRaftForwarding(tlsConn)
Expand Down
Loading

0 comments on commit ac4ad5f

Please sign in to comment.