Skip to content

Commit

Permalink
base: introduce SQLInstanceID (and container)
Browse files Browse the repository at this point in the history
With the advent of multi-tenancy, SQL servers can optionally be
decoupled from the (and share a) KV backend. As a consequence, the
NodeID may not be available and its use across all components that need
to function in a multi-tenant setup is deprecated. Yet, some components
require a similar unique identifier for the SQL server they're a part
of.

This new identifier is the SQLInstanceID. At the time of writing, since
we're not yet able to start true SQL tenant servers and only exercise
parts of this functionality in unit testing, it is a randomized integer.
It's possible that we'll replace it with a per-tenant or even global
counter in production, putting it truly on par with the NodeID, to be
able to give the simple guarantee that these IDs are never reused
between different incarnations of the SQL process for a given tenant.
This final decision is kicked down the road, see cockroachdb#48009.

This commit introduces an IDContainer which will aid this process by
wrapping both the NodeID and SQLInstanceID, very similar to the work
carried out for Gossip in cockroachdb#47972. We do not yet audit all of the call
sites to keep the initial diff container. Instead, IDContainer has a
Get() method that makes it a drop-in replacement for the previously used
NodeIDContainer. `Get ` will be removed in a future commit while moving
all callers to either the `Optional{,Err}` or `DeprecatedNodeID`
methods.

Release note: None
  • Loading branch information
tbg committed Apr 29, 2020
1 parent 41a361b commit 3275dc8
Show file tree
Hide file tree
Showing 17 changed files with 157 additions and 38 deletions.
105 changes: 105 additions & 0 deletions pkg/base/node_id.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,14 @@ package base

import (
"context"
"fmt"
"strconv"
"strings"
"sync/atomic"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/errorutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
)

Expand Down Expand Up @@ -66,3 +69,105 @@ func (n *NodeIDContainer) Set(ctx context.Context, val roachpb.NodeID) {
func (n *NodeIDContainer) Reset(val roachpb.NodeID) {
atomic.StoreInt32(&n.nodeID, int32(val))
}

// A SQLInstanceID is an ephemeral ID assigned to a running instance of the SQL
// server. This is distinct from a NodeID, which is a long-lived identifier
// assigned to a node in the KV layer which is unique across all KV nodes in the
// cluster and persists across restarts. Instead, a SQLInstance is similar to a
// process ID from the unix world: an integer assigned to the SQL server
// on process start which is unique across all SQL server processes running
// on behalf of the tenant, while the SQL server is running.
//
// NB: until https://github.com/cockroachdb/cockroach/issues/47899 is addressed,
// the properties of the SQLInstanceID hold trivially due to the constraint that
// only one SQL server must be running on behalf of the tenant at any given
// time. After that, it's likely that we'll allocate these IDs off a counter,
// so they will be completely unique (per tenant).
type SQLInstanceID int64

// IDContainer wraps a SQLInstanceID and optionally a NodeID.
type IDContainer struct {
w errorutil.TenantSQLDeprecatedWrapper // NodeID
sqlInstanceID SQLInstanceID
}

// NewIDContainer sets up an IDContainer wrapping the (positive) SQLInstanceID
// and a NodeID. See errorutil.TenantSQLDeprecatedWrapper for an explanation of
// the nodeIDExposed parameter.
//
// As a special case, a zero sqlInstanceID in conjunction with
// nodeIDExposed==true falls back to the NodeID in SQLInstanceID(). This is used
// in single-tenant deployments.
func NewIDContainer(
sqlInstanceID SQLInstanceID, nodeID *NodeIDContainer, nodeIDExposed bool,
) *IDContainer {
return &IDContainer{
w: errorutil.MakeTenantSQLDeprecatedWrapper(nodeID, nodeIDExposed),
sqlInstanceID: sqlInstanceID,
}
}

func (c *IDContainer) String() string {
var buf strings.Builder
fmt.Fprintf(&buf, "i%d", c.sqlInstanceID)
if n, ok := c.OptionalNodeID(); ok {
fmt.Fprintf(&buf, "@n%s,", n)
}
return ""
}

// OptionalNodeID returns the NodeID and true, if the former is exposed.
// Otherwise, returns zero and false.
func (c *IDContainer) OptionalNodeID() (roachpb.NodeID, bool) {
v, ok := c.w.Optional()
if !ok {
return 0, false
}
return v.(*NodeIDContainer).Get(), true
}

// OptionalNodeIDErr is like OptionalNodeID, but returns an error (referring to
// the optionally supplied Github issues) if the ID is not present.
func (c *IDContainer) OptionalNodeIDErr(issueNos ...int) (roachpb.NodeID, error) {
v, err := c.w.OptionalErr(issueNos...)
if err != nil {
return 0, err
}
return v.(*NodeIDContainer).Get(), nil
}

// DeprecatedNodeID returns the NodeID. This call is deprecated: removal of all
// call sites is the goal, at which point this method will be removed. Calls to
// this method reflect essential functionality which needs to be reworked in
// order to enable multi-tenancy.
func (c *IDContainer) DeprecatedNodeID(issueNo int) roachpb.NodeID {
return c.w.Deprecated(issueNo).(*NodeIDContainer).Get()
}

// SQLInstanceID returns the wrapped SQLInstanceID.
func (c *IDContainer) SQLInstanceID() SQLInstanceID {
if n, ok := c.OptionalNodeID(); ok {
return SQLInstanceID(n)
}
return c.sqlInstanceID
}

// Get is a temporary method to aid refactoring.
//
// TODO(tbg): remove.
func (c *IDContainer) Get() roachpb.NodeID {
// Silence staticcheck.
var _ = (*IDContainer)(nil).OptionalNodeID
var _ = (*IDContainer)(nil).OptionalNodeIDErr
var _ = (*IDContainer)(nil).SQLInstanceID
return c.DeprecatedNodeID(-12131415)

}

// TestingIDContainer is an IDContainer with hard-coded SQLInstanceID of 10 and
// NodeID of 1.
var TestingIDContainer = func() *IDContainer {
var c NodeIDContainer
c.Set(context.Background(), 1)
return NewIDContainer(10, &c, true /* exposed */)
}()
4 changes: 2 additions & 2 deletions pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ type Registry struct {
db *kv.DB
ex sqlutil.InternalExecutor
clock *hlc.Clock
nodeID *base.NodeIDContainer
nodeID *base.IDContainer
settings *cluster.Settings
planFn planHookMaker
metrics Metrics
Expand Down Expand Up @@ -178,7 +178,7 @@ func MakeRegistry(
nl NodeLiveness,
db *kv.DB,
ex sqlutil.InternalExecutor,
nodeID *base.NodeIDContainer,
nodeID *base.IDContainer,
settings *cluster.Settings,
histogramWindowInterval time.Duration,
planFn planHookMaker,
Expand Down
7 changes: 4 additions & 3 deletions pkg/jobs/registry_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,13 @@ func TestRegistryResumeExpiredLease(t *testing.T) {
const cancelInterval = time.Duration(math.MaxInt64)
const adoptInterval = time.Nanosecond

var c base.NodeIDContainer
c.Set(ctx, id)
idContainer := base.NewIDContainer(0, &c, true /* exposed */)
ac := log.AmbientContext{Tracer: tracing.NewTracer()}
nodeID := &base.NodeIDContainer{}
nodeID.Reset(id)
r := jobs.MakeRegistry(
ac, s.Stopper(), clock, nodeLiveness, db, s.InternalExecutor().(sqlutil.InternalExecutor),
nodeID, s.ClusterSettings(), server.DefaultHistogramWindowInterval, jobs.FakePHS, "",
idContainer, s.ClusterSettings(), server.DefaultHistogramWindowInterval, jobs.FakePHS, "",
)
if err := r.Start(ctx, s.Stopper(), cancelInterval, adoptInterval); err != nil {
t.Fatal(err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/jobs/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestRegistryCancelation(t *testing.T) {
mClock := hlc.NewManualClock(hlc.UnixNano())
clock := hlc.NewClock(mClock.UnixNano, time.Nanosecond)
registry := MakeRegistry(
log.AmbientContext{}, stopper, clock, nodeLiveness, db, nil /* ex */, FakeNodeID, cluster.NoSettings,
log.AmbientContext{}, stopper, clock, nodeLiveness, db, nil /* ex */, base.TestingIDContainer, cluster.NoSettings,
histogramWindowInterval, FakePHS, "")

const cancelInterval = time.Nanosecond
Expand Down
21 changes: 14 additions & 7 deletions pkg/kv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -826,10 +826,18 @@ func TestNodeIDAndObservedTimestamps(t *testing.T) {
return ba.CreateReply(), nil
})

clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond)
dbCtx := kv.DefaultDBContext()
dbCtx.NodeID = &base.NodeIDContainer{}
db := kv.NewDBWithContext(testutils.MakeAmbientCtx(), factory, clock, dbCtx)
setup := func(nodeID roachpb.NodeID) *kv.DB {
clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond)
dbCtx := kv.DefaultDBContext()
var c base.NodeIDContainer
if nodeID != 0 {
c.Set(context.Background(), nodeID)
}
dbCtx.NodeID = base.NewIDContainer(0, &c, true /* exposed */)

db := kv.NewDBWithContext(testutils.MakeAmbientCtx(), factory, clock, dbCtx)
return db
}
ctx := context.Background()

// Verify direct creation of Txns.
Expand All @@ -845,6 +853,7 @@ func TestNodeIDAndObservedTimestamps(t *testing.T) {
}
for i, test := range directCases {
t.Run(fmt.Sprintf("direct-txn-%d", i), func(t *testing.T) {
db := setup(test.nodeID)
now := db.Clock().Now()
kvTxn := roachpb.MakeTransaction("unnamed", nil /*baseKey*/, roachpb.NormalUserPriority, now, db.Clock().MaxOffset().Nanoseconds())
txn := kv.NewTxnFromProto(ctx, db, test.nodeID, now, test.typ, &kvTxn)
Expand All @@ -865,9 +874,7 @@ func TestNodeIDAndObservedTimestamps(t *testing.T) {
}
for i, test := range indirectCases {
t.Run(fmt.Sprintf("indirect-txn-%d", i), func(t *testing.T) {
if test.nodeID != 0 {
dbCtx.NodeID.Set(ctx, test.nodeID)
}
db := setup(test.nodeID)
if err := db.Txn(
ctx, func(_ context.Context, txn *kv.Txn) error {
ots := txn.TestingCloneTxn().ObservedTimestamps
Expand Down
8 changes: 5 additions & 3 deletions pkg/kv/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,18 +182,20 @@ type DBContext struct {
UserPriority roachpb.UserPriority
// NodeID provides the node ID for setting the gateway node and avoiding
// clock uncertainty for root transactions started at the gateway.
NodeID *base.NodeIDContainer
NodeID *base.IDContainer
// Stopper is used for async tasks.
Stopper *stop.Stopper
}

// DefaultDBContext returns (a copy of) the default options for
// NewDBWithContext.
func DefaultDBContext() DBContext {
var c base.NodeIDContainer
return DBContext{
UserPriority: roachpb.NormalUserPriority,
NodeID: &base.NodeIDContainer{},
Stopper: stop.NewStopper(),
// TODO(tbg): this is ugly. Force callers to pass in an IDContainer.
NodeID: base.NewIDContainer(0, &c, true /* exposed */),
Stopper: stop.NewStopper(),
}
}

Expand Down
6 changes: 4 additions & 2 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,8 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
// bootstrapped; otherwise a new one is allocated in Node.
nodeIDContainer := &base.NodeIDContainer{}
cfg.AmbientCtx.AddLogTag("n", nodeIDContainer)
const sqlInstanceID = base.SQLInstanceID(0)
idContainer := base.NewIDContainer(sqlInstanceID, nodeIDContainer, true /* exposed */)

ctx := cfg.AmbientCtx.AnnotateCtx(context.Background())

Expand Down Expand Up @@ -329,7 +331,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
tcsFactory := kvcoord.NewTxnCoordSenderFactory(txnCoordSenderFactoryCfg, distSender)

dbCtx := kv.DefaultDBContext()
dbCtx.NodeID = nodeIDContainer
dbCtx.NodeID = idContainer
dbCtx.Stopper = stopper
db := kv.NewDBWithContext(cfg.AmbientCtx, tcsFactory, clock, dbCtx)

Expand Down Expand Up @@ -531,7 +533,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
nodeDialer: nodeDialer,
grpcServer: grpcServer.Server,
recorder: recorder,
nodeIDContainer: nodeIDContainer,
nodeIDContainer: idContainer,
externalStorage: externalStorage,
externalStorageFromURI: externalStorageFromURI,
isMeta1Leaseholder: node.stores.IsMeta1Leaseholder,
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ type sqlServerOptionalArgs struct {
// For the temporaryObjectCleaner.
isMeta1Leaseholder func(hlc.Timestamp) (bool, error)
// DistSQL, lease management, and others want to know the node they're on.
nodeIDContainer *base.NodeIDContainer
nodeIDContainer *base.IDContainer

// Used by backup/restore.
externalStorage cloud.ExternalStorageFactory
Expand Down
10 changes: 6 additions & 4 deletions pkg/server/testserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,11 @@ func testSQLServerArgs(ts *TestServer) sqlServerArgs {

dummyRecorder := &status.MetricsRecorder{}

var nodeIDContainer base.NodeIDContainer
const fakeNodeID = roachpb.NodeID(9999)
var c base.NodeIDContainer
c.Set(context.Background(), fakeNodeID)
const sqlInstanceID = base.SQLInstanceID(10001)
idContainer := base.NewIDContainer(sqlInstanceID, &c, false /* exposed */)

// We don't need this for anything except some services that want a gRPC
// server to register against (but they'll never get RPCs at the time of
Expand All @@ -493,7 +497,7 @@ func testSQLServerArgs(ts *TestServer) sqlServerArgs {
isMeta1Leaseholder: func(timestamp hlc.Timestamp) (bool, error) {
return false, errors.New("fake isMeta1Leaseholder")
},
nodeIDContainer: &nodeIDContainer,
nodeIDContainer: idContainer,
externalStorage: func(ctx context.Context, dest roachpb.ExternalStorage) (cloud.ExternalStorage, error) {
return nil, errors.New("fake external storage")
},
Expand All @@ -517,8 +521,6 @@ func testSQLServerArgs(ts *TestServer) sqlServerArgs {
func (ts *TestServer) StartTenant() (addr string, _ error) {
ctx := context.Background()
args := testSQLServerArgs(ts)
const nodeID = 9999
args.nodeIDContainer.Set(context.Background(), nodeID)
s, err := newSQLServer(ctx, args)
if err != nil {
return "", err
Expand Down
3 changes: 1 addition & 2 deletions pkg/sql/conn_executor_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,7 @@ func startConnExecutor(
})
db := kv.NewDB(testutils.MakeAmbientCtx(), factory, clock)
st := cluster.MakeTestingClusterSettings()
nodeID := &base.NodeIDContainer{}
nodeID.Set(ctx, 1)
nodeID := base.TestingIDContainer
distSQLMetrics := execinfra.MakeDistSQLMetrics(time.Hour /* histogramWindow */)
cfg := &ExecutorConfig{
AmbientCtx: testutils.MakeAmbientCtx(),
Expand Down
4 changes: 1 addition & 3 deletions pkg/sql/distsql/inbound_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,6 @@ func TestOutboxInboundStreamIntegration(t *testing.T) {
ctx := context.Background()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
ni := base.NodeIDContainer{}
ni.Set(ctx, 1)
st := cluster.MakeTestingClusterSettings()
mt := execinfra.MakeDistSQLMetrics(time.Hour /* histogramWindow */)
srv := NewServer(
Expand All @@ -66,7 +64,7 @@ func TestOutboxInboundStreamIntegration(t *testing.T) {
Settings: st,
Stopper: stopper,
Metrics: &mt,
NodeID: &ni,
NodeID: base.TestingIDContainer,
},
)

Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,7 @@ func getMetricMeta(meta metric.Metadata, internal bool) metric.Metadata {
// NodeInfo contains metadata about the executing node and cluster.
type NodeInfo struct {
ClusterID func() uuid.UUID
NodeID *base.NodeIDContainer
NodeID *base.IDContainer
AdminURL func() *url.URL
PGURL func(*url.Userinfo) (*url.URL, error)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/execinfra/server_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ type ServerConfig struct {
Metrics *DistSQLMetrics

// NodeID is the id of the node on which this Server is running.
NodeID *base.NodeIDContainer
NodeID *base.IDContainer
ClusterID *base.ClusterIDContainer
ClusterName string

Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func storedLeaseExpiration(expiration hlc.Timestamp) tree.DTimestamp {
// LeaseStore implements the operations for acquiring and releasing leases and
// publishing a new version of a descriptor. Exported only for testing.
type LeaseStore struct {
nodeIDContainer *base.NodeIDContainer
nodeIDContainer *base.IDContainer
db *kv.DB
clock *hlc.Clock
internalExecutor sqlutil.InternalExecutor
Expand Down Expand Up @@ -1426,7 +1426,7 @@ const leaseConcurrencyLimit = 5
// stopper is used to run async tasks. Can be nil in tests.
func NewLeaseManager(
ambientCtx log.AmbientContext,
nodeIDContainer *base.NodeIDContainer,
nodeIDContainer *base.IDContainer,
db *kv.DB,
clock *hlc.Clock,
internalExecutor sqlutil.InternalExecutor,
Expand Down
5 changes: 3 additions & 2 deletions pkg/sql/lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,9 @@ func (t *leaseTest) mustPublish(ctx context.Context, nodeID uint32, descID sqlba
func (t *leaseTest) node(nodeID uint32) *sql.LeaseManager {
mgr := t.nodes[nodeID]
if mgr == nil {
nc := &base.NodeIDContainer{}
nc.Set(context.TODO(), roachpb.NodeID(nodeID))
var c base.NodeIDContainer
c.Set(context.Background(), roachpb.NodeID(nodeID))
nc := base.NewIDContainer(0, &c, true /* exposed*/)
// Hack the ExecutorConfig that we pass to the LeaseManager to have a
// different node id.
cfgCpy := t.server.ExecutorConfig().(sql.ExecutorConfig)
Expand Down
4 changes: 1 addition & 3 deletions pkg/sql/values_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,9 @@ import (
func makeTestPlanner() *planner {
// Initialize an Executorconfig sufficiently for the purposes of creating a
// planner.
var nodeID base.NodeIDContainer
nodeID.Set(context.TODO(), 1)
execCfg := ExecutorConfig{
NodeInfo: NodeInfo{
NodeID: &nodeID,
NodeID: base.TestingIDContainer,
ClusterID: func() uuid.UUID {
return uuid.MakeV4()
},
Expand Down
6 changes: 5 additions & 1 deletion pkg/testutils/localtestcluster/local_test_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,11 @@ func (ltc *LocalTestCluster) Start(t testing.TB, baseCtx *base.Config, initFacto
dbCtx.Stopper = ltc.Stopper
ltc.DBContext = &dbCtx
}
ltc.DBContext.NodeID.Set(context.Background(), nodeID)
{
var c base.NodeIDContainer
c.Set(context.Background(), nodeID)
ltc.DBContext.NodeID = base.NewIDContainer(0, &c, true /* exposed */)
}
ltc.DB = kv.NewDBWithContext(cfg.AmbientCtx, factory, ltc.Clock, *ltc.DBContext)
transport := kvserver.NewDummyRaftTransport(cfg.Settings)
// By default, disable the replica scanner and split queue, which
Expand Down

0 comments on commit 3275dc8

Please sign in to comment.