Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NET-4657/add resource service client #18053

Merged
merged 1 commit into from
Jul 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions agent/acl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/proto-public/pbresource"
"github.com/hashicorp/consul/sdk/testutil"
"github.com/hashicorp/consul/types"

Expand Down Expand Up @@ -163,6 +164,9 @@ func (a *TestACLAgent) Stats() map[string]map[string]string {
func (a *TestACLAgent) ReloadConfig(_ consul.ReloadableConfig) error {
return fmt.Errorf("Unimplemented")
}
func (a *TestACLAgent) ResourceServiceClient() pbresource.ResourceServiceClient {
Copy link
Member

@wangxinyi7 wangxinyi7 Jul 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this here? I didn't see this function referenced anywhere.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah to resolve type error

return nil
}

func TestACL_Version8EnabledByDefault(t *testing.T) {
t.Parallel()
Expand Down
4 changes: 4 additions & 0 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ import (
"github.com/hashicorp/consul/lib/mutex"
"github.com/hashicorp/consul/lib/routine"
"github.com/hashicorp/consul/logging"
"github.com/hashicorp/consul/proto-public/pbresource"
"github.com/hashicorp/consul/proto/private/pboperator"
"github.com/hashicorp/consul/proto/private/pbpeering"
"github.com/hashicorp/consul/tlsutil"
Expand Down Expand Up @@ -198,6 +199,9 @@ type delegate interface {

RPC(ctx context.Context, method string, args interface{}, reply interface{}) error

// ResourceServiceClient is a client for the gRPC Resource Service.
ResourceServiceClient() pbresource.ResourceServiceClient

SnapshotRPC(args *structs.SnapshotRequest, in io.Reader, out io.Writer, replyFn structs.SnapshotReplyFn) error
Shutdown() error
Stats() map[string]map[string]string
Expand Down
15 changes: 15 additions & 0 deletions agent/consul/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/logging"
"github.com/hashicorp/consul/proto-public/pbresource"
"github.com/hashicorp/consul/tlsutil"
"github.com/hashicorp/consul/types"
)
Expand Down Expand Up @@ -93,6 +94,9 @@ type Client struct {
EnterpriseClient

tlsConfigurator *tlsutil.Configurator

// resourceServiceClient is a client for the gRPC Resource Service.
resourceServiceClient pbresource.ResourceServiceClient
}

// NewClient creates and returns a Client
Expand Down Expand Up @@ -151,6 +155,13 @@ func NewClient(config *Config, deps Deps) (*Client, error) {
}
c.router = deps.Router

conn, err := deps.GRPCConnPool.ClientConn(deps.ConnPool.Datacenter)
if err != nil {
c.Shutdown()
return nil, fmt.Errorf("Failed to get gRPC client connection: %w", err)
}
c.resourceServiceClient = pbresource.NewResourceServiceClient(conn)

// Start LAN event handlers after the router is complete since the event
// handlers depend on the router and the router depends on Serf.
go c.lanEventHandler()
Expand Down Expand Up @@ -451,3 +462,7 @@ func (c *Client) AgentEnterpriseMeta() *acl.EnterpriseMeta {
func (c *Client) agentSegmentName() string {
return c.config.Segment
}

func (c *Client) ResourceServiceClient() pbresource.ResourceServiceClient {
return c.resourceServiceClient
}
78 changes: 62 additions & 16 deletions agent/consul/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,10 +442,20 @@ type Server struct {
// typeRegistry contains Consul's registered resource types.
typeRegistry resource.Registry

// internalResourceServiceClient is a client that can be used to communicate
// with the Resource Service in-process (i.e. not via the network) without auth.
// It should only be used for purely-internal workloads, such as controllers.
internalResourceServiceClient pbresource.ResourceServiceClient
// resourceServiceServer implements the Resource Service.
resourceServiceServer *resourcegrpc.Server

// insecureResourceServiceClient is a client that can be used to communicate
// with the Resource Service in-process (i.e. not via the network) *without*
// auth. It should only be used for purely-internal workloads, such as
// controllers.
insecureResourceServiceClient pbresource.ResourceServiceClient

// secureResourceServiceClient is a client that can be used to communicate
// with the Resource Service in-process (i.e. not via the network) *with* auth.
// It can be used to make requests to the Resource Service on behalf of the user
// (e.g. from the HTTP API).
secureResourceServiceClient pbresource.ResourceServiceClient

// controllerManager schedules the execution of controllers.
controllerManager *controller.Manager
Expand Down Expand Up @@ -803,11 +813,16 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, incom
s.grpcHandler = newGRPCHandlerFromConfig(flat, config, s)
s.grpcLeaderForwarder = flat.LeaderForwarder

if err := s.setupInternalResourceService(logger); err != nil {
if err := s.setupSecureResourceServiceClient(); err != nil {
return nil, err
}

if err := s.setupInsecureResourceServiceClient(logger); err != nil {
return nil, err
}

s.controllerManager = controller.NewManager(
s.internalResourceServiceClient,
s.insecureResourceServiceClient,
logger.Named(logging.ControllerRuntime),
)
s.registerResources(flat)
Expand Down Expand Up @@ -929,6 +944,7 @@ func newGRPCHandlerFromConfig(deps Deps, config *Config, s *Server) connHandler
s.peerStreamServer.Register(srv)
s.externalACLServer.Register(srv)
s.externalConnectCAServer.Register(srv)
s.resourceServiceServer.Register(srv)
}

return agentgrpc.NewHandler(deps.Logger, config.RPCAddr, register, nil, s.incomingRPCLimiter)
Expand Down Expand Up @@ -1334,23 +1350,50 @@ func (s *Server) setupExternalGRPC(config *Config, logger hclog.Logger) {
})
s.peerStreamServer.Register(s.externalGRPCServer)

resourcegrpc.NewServer(resourcegrpc.Config{
s.resourceServiceServer = resourcegrpc.NewServer(resourcegrpc.Config{
Registry: s.typeRegistry,
Backend: s.raftStorageBackend,
ACLResolver: s.ACLResolver,
Logger: logger.Named("grpc-api.resource"),
}).Register(s.externalGRPCServer)
})
s.resourceServiceServer.Register(s.externalGRPCServer)
}

func (s *Server) setupInternalResourceService(logger hclog.Logger) error {
server := grpc.NewServer()

resourcegrpc.NewServer(resourcegrpc.Config{
func (s *Server) setupInsecureResourceServiceClient(logger hclog.Logger) error {
server := resourcegrpc.NewServer(resourcegrpc.Config{
Registry: s.typeRegistry,
Backend: s.raftStorageBackend,
ACLResolver: resolver.DANGER_NO_AUTH{},
Logger: logger.Named("grpc-api.resource"),
}).Register(server)
})

conn, err := s.runInProcessGRPCServer(server.Register)
if err != nil {
return err
}
s.insecureResourceServiceClient = pbresource.NewResourceServiceClient(conn)

return nil
}

func (s *Server) setupSecureResourceServiceClient() error {
conn, err := s.runInProcessGRPCServer(s.resourceServiceServer.Register)
if err != nil {
return err
}
s.secureResourceServiceClient = pbresource.NewResourceServiceClient(conn)

return nil
}

// runInProcessGRPCServer runs a gRPC server that can only be accessed in the
// same process, rather than over the network, using a pipe listener.
func (s *Server) runInProcessGRPCServer(registerFn ...func(*grpc.Server)) (*grpc.ClientConn, error) {
server := grpc.NewServer()

for _, fn := range registerFn {
fn(server)
}

pipe := agentgrpc.NewPipeListener()
go server.Serve(pipe)
Expand All @@ -1367,15 +1410,14 @@ func (s *Server) setupInternalResourceService(logger hclog.Logger) error {
)
if err != nil {
server.Stop()
return err
return nil, err
}
go func() {
<-s.shutdownCh
conn.Close()
}()
s.internalResourceServiceClient = pbresource.NewResourceServiceClient(conn)

return nil
return conn, nil
}

// Shutdown is used to shutdown the server
Expand Down Expand Up @@ -2095,6 +2137,10 @@ func (s *Server) hcpServerStatus(deps Deps) hcp.StatusCallback {
}
}

func (s *Server) ResourceServiceClient() pbresource.ResourceServiceClient {
return s.secureResourceServiceClient
}

func fileExists(name string) (bool, error) {
_, err := os.Stat(name)
if err == nil {
Expand Down