Skip to content

Commit

Permalink
kv/kvclient: introduce new tenant Proxy
Browse files Browse the repository at this point in the history
Fixes cockroachdb#47909.

This commit starts by adding two RPCs to the Internal service:
```
service Internal {
	...
	rpc RangeLookup (RangeLookupRequest) returns (RangeLookupResponse)     {}
	rpc NodeInfo    (NodeInfoRequest)    returns (stream NodeInfoResponse) {}
}

// RangeLookupRequest is a request to proxy a RangeLookup through a Tenant
// service. Its fields correspond to a subset of the args of kv.RangeLookup.
message RangeLookupRequest {
    ...
}

// NodeInfoRequest is a request to establish an indefinite stream on a Tenant
// service that provides an initial NodeInfoResponse and a NodeInfoResponse
// whenever the collection of KV nodes in a cluster changes. It effectively
// proxies any updates to NodeDescriptors in the KV gossip network back to the
// client of the request.
message NodeInfoRequest {}
```

The commit then introduces new `kvtenant.Proxy` object. Proxy mediates
the communication of cluster-wide state to sandboxed SQL-only tenant
processes through a restricted interface. A Proxy is seeded with a set
of one or more network addresses that reference existing KV nodes in the
cluster (or a load-balancer which fans out to some/all KV nodes). On
startup, it establishes contact with one of these nodes to learn about
the topology of the cluster and bootstrap the rest of SQL <-> KV network
communication.

Proxy has two main roles:

First, Proxy is capable of providing information on each of the KV nodes
in the cluster in the form of NodeDescriptors. This obviates the need
for SQL-only tenant processes to join the cluster-wide gossip network.
In doing so, it satisfies the `NodeDescStore` interface and can be used
as an `AddressResolver` with a small adapter.

Second, Proxy is capable of providing Range addressing information in
the form of RangeDescriptors through delegated RangeLookup requests.
This is necessary because SQL-only tenants are restricted from reading
Range Metadata keys directly. Instead, the RangeLookup requests are
proxied through existing KV nodes while being subject to additional
validation (e.g. is the Range being requested owned by the requesting
tenant?). In doing so, it satisfies the `RangeDescriptorDB` interface
and can be used to delegate all DistSender/RangeCache descriptor lookups
to KV nodes.

With this commit, we can mostly run a SQL-only tenant process without
joining the KV cluster's gossip network. This works if I comment out a
few of the uses of gossip due to cockroachdb#49692 and cockroachdb#47150 in SQL. Notably,
with the call to `DeprecatedRegisterSystemConfigChannel` in `sql.Server.Start`
removed, I can remove `Gossip` from `makeSQLServerArgs` entirely and
things "just work".
  • Loading branch information
nvanbenschoten committed Jun 23, 2020
1 parent 659c428 commit 019b51d
Show file tree
Hide file tree
Showing 16 changed files with 3,954 additions and 658 deletions.
929 changes: 929 additions & 0 deletions c-deps/libroach/protos/roachpb/api.pb.cc

Large diffs are not rendered by default.

673 changes: 667 additions & 6 deletions c-deps/libroach/protos/roachpb/api.pb.h

Large diffs are not rendered by default.

85 changes: 77 additions & 8 deletions pkg/gossip/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,10 +261,11 @@ type Gossip struct {

// resolvers is a list of resolvers used to determine
// bootstrap hosts for connecting to the gossip network.
resolverIdx int
resolvers []resolver.Resolver
resolversTried map[int]struct{} // Set of attempted resolver indexes
nodeDescs map[roachpb.NodeID]*roachpb.NodeDescriptor
resolverIdx int
resolvers []resolver.Resolver
resolversTried map[int]struct{} // Set of attempted resolver indexes
nodeDescs map[roachpb.NodeID]*roachpb.NodeDescriptor
nodeDescsChannels []chan<- struct{}
// storeMap maps store IDs to node IDs.
storeMap map[roachpb.StoreID]roachpb.NodeID

Expand Down Expand Up @@ -554,6 +555,14 @@ func (g *Gossip) GetNodeDescriptor(nodeID roachpb.NodeID) (*roachpb.NodeDescript
return g.getNodeDescriptorLocked(nodeID)
}

// GetAllNodeDescriptors returns the descriptors of all nodes in the gossip
// network. The returned slice will be sorted in increasing NodeID order.
func (g *Gossip) GetAllNodeDescriptors() []*roachpb.NodeDescriptor {
g.mu.RLock()
defer g.mu.RUnlock()
return g.getAllNodeDescriptorsLocked()
}

// LogStatus logs the current status of gossip such as the incoming and
// outgoing connections.
func (g *Gossip) LogStatus() {
Expand Down Expand Up @@ -833,6 +842,7 @@ func (g *Gossip) updateNodeAddress(key string, content roachpb.Value) {
existingDesc, ok := g.nodeDescs[desc.NodeID]
if !ok || !proto.Equal(existingDesc, &desc) {
g.nodeDescs[desc.NodeID] = &desc
g.notifyNodeDescriptorsUpdatedLocked()
}
// Skip all remaining logic if the address hasn't changed, since that's all
// the logic cares about.
Expand Down Expand Up @@ -864,6 +874,50 @@ func (g *Gossip) updateNodeAddress(key string, content roachpb.Value) {
func (g *Gossip) removeNodeDescriptorLocked(nodeID roachpb.NodeID) {
delete(g.nodeDescs, nodeID)
g.recomputeMaxPeersLocked()
g.notifyNodeDescriptorsUpdatedLocked()
}

func (g *Gossip) notifyNodeDescriptorsUpdatedLocked() {
for _, c := range g.nodeDescsChannels {
select {
case c <- struct{}{}:
default:
}
}
}

// RegisterNodeDescriptorChannel registers a channel to signify updates to the
// nodes in the gossip network. It is notified after registration and whenever
// there are changes the the node descriptors tracked by the gossip instance.
// After a channel notification, a call to GetAllNodeDescriptors is guaranteed
// to observe the updated descriptor state. Returns the channel and a function
// to unregister the channel.
func (g *Gossip) RegisterNodeDescriptorChannel() (<-chan struct{}, func()) {
// Create channel that receives new node descriptor notifications.
// The channel has a size of 1 to prevent gossip from blocking on it.
c := make(chan struct{}, 1)

// Notify the channel right away.
c <- struct{}{}

g.mu.Lock()
g.nodeDescsChannels = append(g.nodeDescsChannels, c)
g.mu.Unlock()

unregister := func() {
g.mu.Lock()
defer g.mu.Unlock()
for i, targetC := range g.nodeDescsChannels {
if targetC == c {
numCs := len(g.nodeDescsChannels)
g.nodeDescsChannels[i] = g.nodeDescsChannels[numCs-1]
g.nodeDescsChannels[numCs-1] = nil // for GC
g.nodeDescsChannels = g.nodeDescsChannels[:numCs-1]
break
}
}
}
return c, unregister
}

// updateStoreMaps is a gossip callback which is used to update storeMap.
Expand Down Expand Up @@ -991,6 +1045,22 @@ func (g *Gossip) getNodeIDSQLAddressLocked(nodeID roachpb.NodeID) (*util.Unresol
return &nd.SQLAddress, nil
}

// getAllNodeDescriptorsLocked returns the descriptors of all active nodes in
// the gossip network. The returned slice will be sorted in increasing NodeID
// order. The mutex is assumed held by the caller. This method is called
// externally via GetAllNodeDescriptors.
func (g *Gossip) getAllNodeDescriptorsLocked() []*roachpb.NodeDescriptor {
descs := make([]*roachpb.NodeDescriptor, 0, len(g.nodeDescs))
for _, desc := range g.nodeDescs {
descs = append(descs, desc)
}
sort.Slice(descs, func(i, j int) bool {
// NodeID is the map key for nodeDescs, so it must be unique.
return descs[i].NodeID < descs[j].NodeID
})
return descs
}

// AddInfo adds or updates an info object. Returns an error if info
// couldn't be added.
func (g *Gossip) AddInfo(key string, val []byte, ttl time.Duration) error {
Expand Down Expand Up @@ -1165,19 +1235,18 @@ func (g *Gossip) GetSystemConfig() *config.SystemConfig {
// system config. It is notified after registration (if a system config is
// already set), and whenever a new system config is successfully unmarshaled.
func (g *Gossip) RegisterSystemConfigChannel() <-chan struct{} {
g.systemConfigMu.Lock()
defer g.systemConfigMu.Unlock()

// Create channel that receives new system config notifications.
// The channel has a size of 1 to prevent gossip from blocking on it.
c := make(chan struct{}, 1)
g.systemConfigChannels = append(g.systemConfigChannels, c)

// Notify the channel right away if we have a config.
if g.systemConfig != nil {
c <- struct{}{}
}

g.systemConfigMu.Lock()
g.systemConfigChannels = append(g.systemConfigChannels, c)
g.systemConfigMu.Unlock()
return c
}

Expand Down
53 changes: 53 additions & 0 deletions pkg/gossip/gossip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/gogo/protobuf/proto"
"github.com/stretchr/testify/require"
)

// TestGossipInfoStore verifies operation of gossip instance infostore.
Expand Down Expand Up @@ -935,3 +936,55 @@ func TestGossipLoopbackInfoPropagation(t *testing.T) {
return nil
})
}

// TestRegisterNodeDescriptorChannel tests NodeDescriptor subscriptions
// (RegisterNodeDescriptorChannel) and retrieval (GetAllNodeDescriptors).
func TestRegisterNodeDescriptorChannel(t *testing.T) {
defer leaktest.AfterTest(t)()
stopper := stop.NewStopper()
defer stopper.Stop(context.Background())
clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond)
rpcContext := rpc.NewInsecureTestingContext(clock, stopper)
g := NewTest(1, rpcContext, rpc.NewServer(rpcContext), stopper, metric.NewRegistry(), zonepb.DefaultZoneConfigRef())
require.Equal(t, 0, len(g.nodeDescsChannels))

// Register a NodeDescriptor listener channel.
c, unregister := g.RegisterNodeDescriptorChannel()
defer unregister() // can be called multiple times
require.Equal(t, 1, len(g.nodeDescsChannels))

// The channel should already be notified.
require.Equal(t, 1, len(c))
<-c

// Gossip starts with no NodeDescriptors.
expNodes := []*roachpb.NodeDescriptor{}
require.Equal(t, expNodes, g.GetAllNodeDescriptors())

// Add NodeDescriptors to the info store.
for i := 1; i <= 3; i++ {
node := &roachpb.NodeDescriptor{
NodeID: roachpb.NodeID(i),
Address: util.MakeUnresolvedAddr("tcp", fmt.Sprintf("1.1.1.1:%d", i)),
}
if err := g.SetNodeDescriptor(node); err != nil {
t.Fatalf("failed setting node descriptor %+v: %s", node, err)
}
<-c
expNodes = append(expNodes, node)
require.Equal(t, expNodes, g.GetAllNodeDescriptors())
}

// Move node 2 to the address of node 3.
movedNode := expNodes[1]
movedNode.Address = expNodes[2].Address
if err := g.SetNodeDescriptor(movedNode); err != nil {
t.Fatal(err)
}
<-c
require.Equal(t, expNodes, g.GetAllNodeDescriptors())

// Unregister the listener.
unregister()
require.Equal(t, 0, len(g.nodeDescsChannels))
}
1 change: 1 addition & 0 deletions pkg/gossip/infostore.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ func (is *infoStore) registerCallback(
if targetCB == cb {
numCBs := len(is.callbacks)
is.callbacks[i] = is.callbacks[numCBs-1]
is.callbacks[numCBs-1] = nil // for GC
is.callbacks = is.callbacks[:numCBs-1]
break
}
Expand Down
10 changes: 10 additions & 0 deletions pkg/kv/kvclient/kvcoord/send_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,20 @@ func (n Node) Batch(
return &roachpb.BatchResponse{}, nil
}

func (n Node) RangeLookup(
_ context.Context, _ *roachpb.RangeLookupRequest,
) (*roachpb.RangeLookupResponse, error) {
panic("unimplemented")
}

func (n Node) RangeFeed(_ *roachpb.RangeFeedRequest, _ roachpb.Internal_RangeFeedServer) error {
panic("unimplemented")
}

func (n Node) NodeInfo(_ *roachpb.NodeInfoRequest, _ roachpb.Internal_NodeInfoServer) error {
panic("unimplemented")
}

// TestSendToOneClient verifies that Send correctly sends a request
// to one server using the heartbeat RPC.
func TestSendToOneClient(t *testing.T) {
Expand Down
14 changes: 14 additions & 0 deletions pkg/kv/kvclient/kvcoord/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,23 @@ func (m *mockInternalClient) Batch(
return br, nil
}

// RangeLookup implements the roachpb.InternalClient interface.
func (m *mockInternalClient) RangeLookup(
ctx context.Context, rl *roachpb.RangeLookupRequest, _ ...grpc.CallOption,
) (*roachpb.RangeLookupResponse, error) {
return nil, fmt.Errorf("unsupported RangeLookup call")
}

// RangeFeed is part of the roachpb.InternalClient interface.
func (m *mockInternalClient) RangeFeed(
ctx context.Context, in *roachpb.RangeFeedRequest, opts ...grpc.CallOption,
) (roachpb.Internal_RangeFeedClient, error) {
return nil, fmt.Errorf("unsupported RangeFeed call")
}

// NodeInfo is part of the roachpb.InternalClient interface.
func (m *mockInternalClient) NodeInfo(
ctx context.Context, args *roachpb.NodeInfoRequest, _ ...grpc.CallOption,
) (roachpb.Internal_NodeInfoClient, error) {
return nil, fmt.Errorf("unsupported NodeInfo call")
}
Loading

0 comments on commit 019b51d

Please sign in to comment.