Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NET-5824 Exported services api #20015

Merged
merged 16 commits into from
Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/20015.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
api: add a new api(/v1/exported-services) to list all the exported service and their consumers.
```
7 changes: 5 additions & 2 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ import (
"github.com/hashicorp/consul/lib/routine"
"github.com/hashicorp/consul/logging"
"github.com/hashicorp/consul/proto-public/pbresource"
"github.com/hashicorp/consul/proto/private/pbconfigentry"
"github.com/hashicorp/consul/proto/private/pboperator"
"github.com/hashicorp/consul/proto/private/pbpeering"
"github.com/hashicorp/consul/tlsutil"
Expand Down Expand Up @@ -398,8 +399,9 @@ type Agent struct {

// TODO: pass directly to HTTPHandlers and DNSServer once those are passed
// into Agent, which will allow us to remove this field.
rpcClientHealth *health.Client
rpcClientConfigEntry *configentry.Client
rpcClientHealth *health.Client
rpcClientConfigEntry *configentry.Client
grpcClientConfigEntry pbconfigentry.ConfigEntryServiceClient

rpcClientPeering pbpeering.PeeringServiceClient

Expand Down Expand Up @@ -502,6 +504,7 @@ func New(bd BaseDeps) (*Agent, error) {

a.rpcClientPeering = pbpeering.NewPeeringServiceClient(conn)
a.rpcClientOperator = pboperator.NewOperatorServiceClient(conn)
a.grpcClientConfigEntry = pbconfigentry.NewConfigEntryServiceClient(conn)

a.serviceManager = NewServiceManager(&a)
a.rpcClientConfigEntry = &configentry.Client{
Expand Down
47 changes: 47 additions & 0 deletions agent/config_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,12 @@ import (
"strings"

"github.com/hashicorp/consul/acl"
external "github.com/hashicorp/consul/agent/grpc-external"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/proto/private/pbconfigentry"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)

const ConfigEntryNotFoundErr string = "Config entry not found"
Expand Down Expand Up @@ -176,3 +181,45 @@ func (s *HTTPHandlers) parseEntMetaForConfigEntryKind(kind string, req *http.Req
}
return s.parseEntMetaNoWildcard(req, entMeta)
}

// ExportedServices returns all the exported services by resolving wildcards and sameness groups
// in the exported services configuration entry
func (s *HTTPHandlers) ExportedServices(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
var entMeta acl.EnterpriseMeta
if err := s.parseEntMetaPartition(req, &entMeta); err != nil {
return nil, err
}
args := pbconfigentry.GetResolvedExportedServicesRequest{
Partition: entMeta.PartitionOrEmpty(),
}

var dc string
options := structs.QueryOptions{}
s.parse(resp, req, &dc, &options)
ctx, err := external.ContextWithQueryOptions(req.Context(), options)
if err != nil {
return nil, err
}

var header metadata.MD
result, err := s.agent.grpcClientConfigEntry.GetResolvedExportedServices(ctx, &args, grpc.Header(&header))
if err != nil {
return nil, err
}

meta, err := external.QueryMetaFromGRPCMeta(header)
if err != nil {
return result.Services, fmt.Errorf("could not convert gRPC metadata to query meta: %w", err)
}
if err := setMeta(resp, &meta); err != nil {
return nil, err
}

svcs := make([]api.ResolvedExportedService, len(result.Services))

for idx, svc := range result.Services {
svcs[idx] = *svc.ToAPI()
}

return svcs, nil
}
76 changes: 76 additions & 0 deletions agent/config_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/testrpc"
)

Expand Down Expand Up @@ -734,3 +735,78 @@ func TestConfig_Apply_ProxyDefaultsExpose(t *testing.T) {
require.Equal(t, expose, entry.Expose)
}
}

func TestConfig_Exported_Services(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}

t.Parallel()
a := NewTestAgent(t, "")
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
defer a.Shutdown()

{
// Register exported services
args := &structs.ExportedServicesConfigEntry{
Name: "default",
Services: []structs.ExportedService{
{
Name: "api",
Consumers: []structs.ServiceConsumer{
{
Peer: "east",
},
{
Peer: "west",
},
},
},
{
Name: "db",
Consumers: []structs.ServiceConsumer{
{
Peer: "east",
},
},
},
},
}
req := structs.ConfigEntryRequest{
Datacenter: "dc1",
Entry: args,
}
var configOutput bool
require.NoError(t, a.RPC(context.Background(), "ConfigEntry.Apply", &req, &configOutput))
require.True(t, configOutput)
}

t.Run("exported services", func(t *testing.T) {
req, _ := http.NewRequest("GET", "/v1/exported-services", nil)
resp := httptest.NewRecorder()
raw, err := a.srv.ExportedServices(resp, req)
require.NoError(t, err)
require.Equal(t, http.StatusOK, resp.Code)

services, ok := raw.([]api.ResolvedExportedService)
require.True(t, ok)
require.Len(t, services, 2)
assertIndex(t, resp)

expected := []api.ResolvedExportedService{
{
Service: "api",
Consumers: api.ResolvedConsumers{
Peers: []string{"east", "west"},
},
},
{
Service: "db",
Consumers: api.ResolvedConsumers{
Peers: []string{"east"},
},
},
}
require.ElementsMatch(t, expected, services)
})
}
31 changes: 31 additions & 0 deletions agent/consul/configentry_backend.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1

package consul

import (
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/acl/resolver"
"github.com/hashicorp/consul/agent/grpc-external/services/configentry"
)

type ConfigEntryBackend struct {
srv *Server
}

var _ configentry.Backend = (*ConfigEntryBackend)(nil)

// NewPeeringBackend returns a peering.Backend implementation that is bound to the given server.
func NewConfigEntryBackend(srv *Server) *ConfigEntryBackend {
return &ConfigEntryBackend{
srv: srv,
}
}

func (b *ConfigEntryBackend) EnterpriseCheckPartitions(partition string) error {
return b.enterpriseCheckPartitions(partition)
}

func (b *ConfigEntryBackend) ResolveTokenAndDefaultMeta(token string, entMeta *acl.EnterpriseMeta, authzCtx *acl.AuthorizerContext) (resolver.Result, error) {
return b.srv.ResolveTokenAndDefaultMeta(token, entMeta, authzCtx)
}
18 changes: 18 additions & 0 deletions agent/consul/configentry_backend_ce.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1

//go:build !consulent

package consul

import (
"fmt"
"strings"
)

func (b *ConfigEntryBackend) enterpriseCheckPartitions(partition string) error {
if partition == "" || strings.EqualFold(partition, "default") {
return nil
}
return fmt.Errorf("Partitions are a Consul Enterprise feature")
}
87 changes: 87 additions & 0 deletions agent/consul/configentry_backend_ce_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1

//go:build !consulent

package consul

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"
gogrpc "google.golang.org/grpc"

"github.com/hashicorp/consul/proto/private/pbconfigentry"
"github.com/hashicorp/consul/sdk/freeport"
"github.com/hashicorp/consul/testrpc"
)

func TestConfigEntryBackend_RejectsPartition(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}

t.Parallel()

_, s1 := testServerWithConfig(t, func(c *Config) {
c.GRPCTLSPort = freeport.GetOne(t)
})
testrpc.WaitForLeader(t, s1.RPC, "dc1")

// make a grpc client to dial s1 directly
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
t.Cleanup(cancel)

conn, err := gogrpc.DialContext(ctx, s1.config.RPCAddr.String(),
gogrpc.WithContextDialer(newServerDialer(s1.config.RPCAddr.String())),
//nolint:staticcheck
gogrpc.WithInsecure(),
gogrpc.WithBlock())
require.NoError(t, err)
t.Cleanup(func() { conn.Close() })

configEntryClient := pbconfigentry.NewConfigEntryServiceClient(conn)

req := pbconfigentry.GetResolvedExportedServicesRequest{
Partition: "test",
}
_, err = configEntryClient.GetResolvedExportedServices(ctx, &req)
require.Error(t, err)
require.Contains(t, err.Error(), "Partitions are a Consul Enterprise feature")
}

func TestConfigEntryBackend_IgnoresDefaultPartition(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}

t.Parallel()

_, s1 := testServerWithConfig(t, func(c *Config) {
c.GRPCTLSPort = freeport.GetOne(t)
})

testrpc.WaitForLeader(t, s1.RPC, "dc1")

// make a grpc client to dial s1 directly
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
t.Cleanup(cancel)

conn, err := gogrpc.DialContext(ctx, s1.config.RPCAddr.String(),
gogrpc.WithContextDialer(newServerDialer(s1.config.RPCAddr.String())),
//nolint:staticcheck
gogrpc.WithInsecure(),
gogrpc.WithBlock())
require.NoError(t, err)
t.Cleanup(func() { conn.Close() })

configEntryClient := pbconfigentry.NewConfigEntryServiceClient(conn)

req := pbconfigentry.GetResolvedExportedServicesRequest{
Partition: "DeFaUlT",
}
_, err = configEntryClient.GetResolvedExportedServices(ctx, &req)
require.NoError(t, err)
}
17 changes: 17 additions & 0 deletions agent/consul/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import (
"github.com/hashicorp/consul/agent/consul/wanfed"
"github.com/hashicorp/consul/agent/consul/xdscapacity"
aclgrpc "github.com/hashicorp/consul/agent/grpc-external/services/acl"
"github.com/hashicorp/consul/agent/grpc-external/services/configentry"
"github.com/hashicorp/consul/agent/grpc-external/services/connectca"
"github.com/hashicorp/consul/agent/grpc-external/services/dataplane"
"github.com/hashicorp/consul/agent/grpc-external/services/peerstream"
Expand Down Expand Up @@ -429,6 +430,9 @@ type Server struct {
// operatorBackend is shared between the external and internal gRPC services for peering
operatorBackend *OperatorBackend

// configEntryBackend is shared between the external and internal gRPC services for config entries
configEntryBackend *ConfigEntryBackend

// peerStreamServer is a server used to handle peering streams from external clusters.
peerStreamServer *peerstream.Server

Expand All @@ -446,6 +450,8 @@ type Server struct {
EnterpriseServer
operatorServer *operator.Server

configEntryServer *configentry.Server

// routineManager is responsible for managing longer running go routines
// run by the Server
routineManager *routine.Manager
Expand Down Expand Up @@ -1053,6 +1059,16 @@ func newGRPCHandlerFromConfig(deps Deps, config *Config, s *Server) connHandler
})
s.operatorServer = o

s.configEntryBackend = NewConfigEntryBackend(s)
s.configEntryServer = configentry.NewServer(configentry.Config{
Backend: s.configEntryBackend,
Logger: deps.Logger.Named("grpc-api.configentry"),
ForwardRPC: func(info structs.RPCInfo, fn func(*grpc.ClientConn) error) (bool, error) {
return s.ForwardGRPC(s.grpcConnPool, info, fn)
},
FSMServer: s,
})

register := func(srv *grpc.Server) {
if config.RPCConfig.EnableStreaming {
pbsubscribe.RegisterStateChangeSubscriptionServer(srv, subscribe.NewServer(
Expand All @@ -1061,6 +1077,7 @@ func newGRPCHandlerFromConfig(deps Deps, config *Config, s *Server) connHandler
}
s.peeringServer.Register(srv)
s.operatorServer.Register(srv)
s.configEntryServer.Register(srv)
s.registerEnterpriseGRPCServices(deps, srv)

// Note: these external gRPC services are also exposed on the internal server to
Expand Down
Loading
Loading