Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…db#96871 cockroachdb#96882

96141: cli: add `--disable-max-offset-check` flag r=erikgrinaker a=erikgrinaker

**util/hlc: add `Clock.ToleratedOffset()`**

This patch adds `Clock.ToleratedOffset()` which returns the tolerated clock offset as measured via RPC heartbeats before the node will self-terminate. The existing hard-coded 80% tolerated clock skew is moved from `RemoteOffset.isHealth()` to `hlc.NewClock()` and passed in via `rpc.ContextOptions.MaxOffset`.

As a consequence of setting `rpc.ContextOptions.MaxOffset` to `ToleratedOffset()` rather than `MaxOffset()`, we now discard RPC clock measurements when the RPC heartbeat round-trip latency exceeds 1.6x of the max offset (2x the tolerated offset), rather than 2x of the max offset. A later commit will rename the `MaxOffset` option to `ToleratedOffset`.

Release note: None
  
**rpc: rename `MaxOffset` to `ToleratedOffset`**

Mechanical rename, except for changes to a couple of log/error messages.

Release note: None
  
**rpc: explicitly allow `ToleratedOffset = 0`**

This patch explicitly allows `ToleratedOffset = 0`, which disables RPC clock offset checking and is often done in tests. The relevant code is also unindented. There are no behavioral changes.

Release note: None
  
**hlc: add and use `NewClockForTesting()`**

This patch adds `NewClockForTesting()`, which returns a clock with no max offset or tolerated offset. It takes a wall clock as a parameter, defaulting to the system clock if `nil`.

All appropriate tests have been updated to use it.

Release note: None

**hlc: add `toleratedOffset` parameter for `NewClock()`**

The tolerated offset is computed via `BaseConfig.ToleratedOffset()`, preserving the previous value of 80% of `MaxOffset`.

Epic: none
Release note: None
  
**cli: add `--disable-max-offset-check` flag**

This patch adds a `--disable-max-offset-check` flag, which disables the clock offset check that will self-terminate the node if its offset to the rest of the cluster (as measured via RPC heartbeats) exceeds `--max-offset`.

This is motivated by deployments with reliable, high-precision clock infrastructure where `--max-offset` may be set very low (e.g. 10 ms) to reduce undertainty restarts and write latency for global tables. With such low offset limits, RPC latency spikes could cause spurious node restarts. This flag allows operators to assume responsibility for ensuring real clock skew never exceeds `--max-offset`.

Resolves cockroachdb#94999.
Epic: none

Release note (ops change): The flag `--disable-max-offset-check` has been added to disable node self-termination when it detects clock skew with the rest of the cluster beyond `--max-offset`. The operator assumes responsibility for ensuring that real clock skew never exceeds `--max-offset`.

96786: clusterversion: remove replication version gates r=erikgrinaker a=erikgrinaker

**clusterversion: remove `AddSSTableTombstones` version gate**

Epic: none
Release note: None
  
**clusterversion: remove `GCHintInReplicaState` cluster version**

Also removes `roachpb.SplitTrigger.WriteGCHint` since we now always write GC hints.

Resolves cockroachdb#96760.

Epic: none
Release note: None

96857: server: fix instance IDs for shared-process tenant servers to node IDs  r=dhartunian a=stevendanna

We have a number of callers that assume the node ID and instance ID
can be used interchangably. In shared-process mode, this has revealed
a number of issues that we need to investigate and resolve.

This PR, however, papers over those issues to an extent by allowing
the shared process tenant servers to inherit the node ID from the
server that is starting it. This then ensures that their SQL instance
ID is the same as their node ID.

I've updated some of the names in NodeIDContainer to make the intended
usage more clear.

Fixes cockroachdb#84602

Epic: CRDB-14537

Release note: None

96871: kvserver: make clearSubsumedReplicaDiskData movable r=pavelkalinnikov a=tbg

We'd like for this to move to `kvstorage` so that it can be unit tested and
teased apart for CRDB-220.

Epic: CRDB-220
Release note: None


96882: kvserver: extract writeUnreplicatedSST r=pavelkalinnikov a=tbg

This PR extracts a standalone method `writeUnreplicatedSST` that can be moved
to `kvstorage` at an appropriate time.

This is a sister PR to cockroachdb#96871, which does the same to some other SSTs created
in `applySnapshot`.

Epic: CRDB-220
Release note: None


Co-authored-by: Erik Grinaker <[email protected]>
Co-authored-by: Steven Danna <[email protected]>
Co-authored-by: Tobias Grieger <[email protected]>
  • Loading branch information
4 people committed Feb 13, 2023
6 parents 9c41c48 + c993304 + 749a1c7 + 9e24c02 + aa870f8 + 31ae36f commit 18c5a38
Show file tree
Hide file tree
Showing 124 changed files with 885 additions and 790 deletions.
12 changes: 6 additions & 6 deletions pkg/acceptance/localcluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,12 +270,12 @@ func (c *Cluster) makeNode(ctx context.Context, nodeIdx int, cfg NodeConfig) (*N
Insecure: true,
}
rpcCtx := rpc.NewContext(ctx, rpc.ContextOptions{
TenantID: roachpb.SystemTenantID,
Config: baseCtx,
Clock: &timeutil.DefaultTimeSource{},
MaxOffset: 0,
Stopper: c.stopper,
Settings: cluster.MakeTestingClusterSettings(),
TenantID: roachpb.SystemTenantID,
Config: baseCtx,
Clock: &timeutil.DefaultTimeSource{},
ToleratedOffset: 0,
Stopper: c.stopper,
Settings: cluster.MakeTestingClusterSettings(),

ClientOnly: true,
})
Expand Down
47 changes: 22 additions & 25 deletions pkg/base/node_id.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ type NodeIDContainer struct {
// sqlInstance is set). It is accessed atomically.
nodeID int32

// sqlInstance is set to true when the node is a SQL instance.
sqlInstance bool
// sqlInstance is set to true when the node is meant to be a
// standalone SQL instance.
standaloneSQLInstance bool

// If nodeID has been set, str represents nodeID converted to string. We
// precompute this value to speed up String() and keep it from allocating
Expand All @@ -51,7 +52,7 @@ type NodeIDContainer struct {
func (n *NodeIDContainer) String() string {
s := n.str.Load()
if s == nil {
if n.sqlInstance {
if n.standaloneSQLInstance {
return "sql?"
}
return "?"
Expand Down Expand Up @@ -81,22 +82,19 @@ func (n *NodeIDContainer) Get() roachpb.NodeID {

// Set sets the current node ID. If it is already set, the value must match.
func (n *NodeIDContainer) Set(ctx context.Context, val roachpb.NodeID) {
n.setInternal(ctx, int32(val), false)
n.setInternal(ctx, int32(val))
}

func (n *NodeIDContainer) setInternal(ctx context.Context, val int32, sqlInstance bool) {
func (n *NodeIDContainer) setInternal(ctx context.Context, val int32) {
if val <= 0 {
panic(errors.AssertionFailedf("trying to set invalid NodeID: %d", val))
}
oldVal := atomic.SwapInt32(&n.nodeID, val)
if n.sqlInstance != sqlInstance {
serverIs := map[bool]redact.SafeString{false: "SQL instance", true: "node"}
panic(errors.AssertionFailedf("server is a %v, cannot set %v ID", serverIs[!n.sqlInstance], serverIs[sqlInstance]))
} else if oldVal != 0 && oldVal != val {
if oldVal != 0 && oldVal != val {
panic(errors.AssertionFailedf("different IDs set: %d, then %d", oldVal, val))
}
prefix := ""
if sqlInstance {
if n.standaloneSQLInstance {
prefix = "sql"
}
n.str.Store(prefix + strconv.Itoa(int(val)))
Expand Down Expand Up @@ -192,42 +190,41 @@ type SQLIDContainer NodeIDContainer
// NewSQLIDContainerForNode sets up a SQLIDContainer which serves the underlying
// NodeID as the SQL instance ID.
func NewSQLIDContainerForNode(nodeID *NodeIDContainer) *SQLIDContainer {
if nodeID.sqlInstance {
if nodeID.standaloneSQLInstance {
// This assertion exists to prevent misuse of the API, where a
// caller would call NewSQLIDContainerForNode() once, cast the
// result type to `*NodeIDContainer`, then mistakenly call
// NewSQLIDContainerForNode() again.
panic(errors.AssertionFailedf("programming error: container is already for a SQL instance"))
panic(errors.AssertionFailedf("programming error: container is already for a standalone SQL instance"))
}
return (*SQLIDContainer)(nodeID)
}

// SwitchToSQLIDContainer changes a NodeIDContainer to become able to
// store SQL instance IDs. After it has been switched, the original
// container will report the SQL instance ID value as NodeID via
// its Get() method.
func (n *NodeIDContainer) SwitchToSQLIDContainer() *SQLIDContainer {
// store SQL instance IDs for standalone SQL instances.
//
// After it has been switched, the original container will report the
// SQL instance ID value as NodeID via its Get() method, under the
// assumption that anything using that ID actually needs the SQL
// Instance ID.
func (n *NodeIDContainer) SwitchToSQLIDContainerForStandaloneSQLInstance() *SQLIDContainer {
sc := NewSQLIDContainerForNode(n)
sc.sqlInstance = true
sc.standaloneSQLInstance = true
return sc
}

// SetSQLInstanceID sets the SQL instance ID. It returns an error if
// we attempt to set an instance ID when the nodeID has already been
// initialized.
func (c *SQLIDContainer) SetSQLInstanceID(ctx context.Context, sqlInstanceID SQLInstanceID) error {
if !c.sqlInstance {
return errors.New("attempting to initialize instance ID when node ID is set")
}

(*NodeIDContainer)(c).setInternal(ctx, int32(sqlInstanceID), true)
(*NodeIDContainer)(c).setInternal(ctx, int32(sqlInstanceID))
return nil
}

// OptionalNodeID returns the NodeID and true, if the former is exposed.
// Otherwise, returns zero and false.
func (c *SQLIDContainer) OptionalNodeID() (roachpb.NodeID, bool) {
if (*NodeIDContainer)(c).sqlInstance {
if (*NodeIDContainer)(c).standaloneSQLInstance {
return 0, false
}
return (*NodeIDContainer)(c).Get(), true
Expand All @@ -236,7 +233,7 @@ func (c *SQLIDContainer) OptionalNodeID() (roachpb.NodeID, bool) {
// OptionalNodeIDErr is like OptionalNodeID, but returns an error (referring to
// the optionally supplied GitHub issues) if the ID is not present.
func (c *SQLIDContainer) OptionalNodeIDErr(issue int) (roachpb.NodeID, error) {
if (*NodeIDContainer)(c).sqlInstance {
if (*NodeIDContainer)(c).standaloneSQLInstance {
return 0, errorutil.UnsupportedWithMultiTenancy(issue)
}
return (*NodeIDContainer)(c).Get(), nil
Expand All @@ -255,7 +252,7 @@ func (c *SQLIDContainer) String() string { return (*NodeIDContainer)(c).String()
// TestingIDContainer is an SQLIDContainer with hard-coded SQLInstanceID of 10.
var TestingIDContainer = func() *SQLIDContainer {
var c NodeIDContainer
sc := c.SwitchToSQLIDContainer()
sc := c.SwitchToSQLIDContainerForStandaloneSQLInstance()
if err := sc.SetSQLInstanceID(context.Background(), 10); err != nil {
panic(err)
}
Expand Down
5 changes: 2 additions & 3 deletions pkg/blobs/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"os"
"path/filepath"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
Expand Down Expand Up @@ -60,7 +59,7 @@ func BenchmarkStreamingReadFile(b *testing.B) {
defer cleanUpFn()

ctx := context.Background()
clock := hlc.NewClockWithSystemTimeSource(time.Nanosecond /* maxOffset */)
clock := hlc.NewClockForTesting(nil)
rpcContext := rpc.NewInsecureTestingContext(ctx, clock, stopper)
rpcContext.TestingAllowNamedRPCToAnonymousServer = true

Expand Down Expand Up @@ -119,7 +118,7 @@ func BenchmarkStreamingWriteFile(b *testing.B) {
defer cleanUpFn()

ctx := context.Background()
clock := hlc.NewClockWithSystemTimeSource(time.Nanosecond /* maxOffset */)
clock := hlc.NewClockForTesting(nil)
rpcContext := rpc.NewInsecureTestingContext(ctx, clock, stopper)
rpcContext.TestingAllowNamedRPCToAnonymousServer = true

Expand Down
11 changes: 5 additions & 6 deletions pkg/blobs/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"os"
"path/filepath"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/blobs/blobspb"
Expand Down Expand Up @@ -112,7 +111,7 @@ func TestBlobClientReadFile(t *testing.T) {
defer cleanUpFn()

ctx := context.Background()
clock := hlc.NewClockWithSystemTimeSource(time.Nanosecond /* maxOffset */)
clock := hlc.NewClockForTesting(nil)
rpcContext := rpc.NewInsecureTestingContext(ctx, clock, stopper)
rpcContext.TestingAllowNamedRPCToAnonymousServer = true

Expand Down Expand Up @@ -206,7 +205,7 @@ func TestBlobClientWriteFile(t *testing.T) {
defer cleanUpFn()

ctx := context.Background()
clock := hlc.NewClockWithSystemTimeSource(time.Nanosecond /* maxOffset */)
clock := hlc.NewClockForTesting(nil)
rpcContext := rpc.NewInsecureTestingContext(ctx, clock, stopper)
rpcContext.TestingAllowNamedRPCToAnonymousServer = true

Expand Down Expand Up @@ -288,7 +287,7 @@ func TestBlobClientList(t *testing.T) {
defer cleanUpFn()

ctx := context.Background()
clock := hlc.NewClockWithSystemTimeSource(time.Nanosecond /* maxOffset */)
clock := hlc.NewClockForTesting(nil)
rpcContext := rpc.NewInsecureTestingContext(ctx, clock, stopper)
rpcContext.TestingAllowNamedRPCToAnonymousServer = true

Expand Down Expand Up @@ -403,7 +402,7 @@ func TestBlobClientDeleteFrom(t *testing.T) {
defer cleanUpFn()

ctx := context.Background()
clock := hlc.NewClockWithSystemTimeSource(time.Nanosecond /* maxOffset */)
clock := hlc.NewClockForTesting(nil)
rpcContext := rpc.NewInsecureTestingContext(ctx, clock, stopper)
rpcContext.TestingAllowNamedRPCToAnonymousServer = true

Expand Down Expand Up @@ -481,7 +480,7 @@ func TestBlobClientStat(t *testing.T) {
defer cleanUpFn()

ctx := context.Background()
clock := hlc.NewClockWithSystemTimeSource(time.Nanosecond /* maxOffset */)
clock := hlc.NewClockForTesting(nil)
rpcContext := rpc.NewInsecureTestingContext(ctx, clock, stopper)
rpcContext.TestingAllowNamedRPCToAnonymousServer = true

Expand Down
3 changes: 1 addition & 2 deletions pkg/ccl/backupccl/restore_data_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"strconv"
"sync/atomic"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/blobs"
Expand Down Expand Up @@ -193,7 +192,7 @@ func runTestIngest(t *testing.T, init func(*cluster.Settings)) {
sstFile := &storage.MemFile{}
sst := storage.MakeBackupSSTWriter(ctx, cs, sstFile)
defer sst.Close()
ts := hlc.NewClockWithSystemTimeSource(time.Nanosecond).Now( /* maxOffset */ )
ts := hlc.NewClockForTesting(nil).Now()
value := roachpb.MakeValueFromString("bar")
for _, idx := range offsets {
key := keySlice[idx]
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func BenchmarkChangefeedTicks(b *testing.B) {
// data every time it's called, but that's a little unsatisfying. Instead,
// wait for each batch to come out of the feed before advancing the
// timestamp.
feedClock := hlc.NewClock(&mockClock{ts: timestamps}, time.Nanosecond /* maxOffset */)
feedClock := hlc.NewClockForTesting(&mockClock{ts: timestamps})
runBench(b, feedClock)
})
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2299,8 +2299,7 @@ func fetchDescVersionModificationTime(
MVCCFilter: roachpb.MVCCFilter_All,
StartTime: hlc.Timestamp{},
}
clock := hlc.NewClockWithSystemTimeSource(time.Minute /* maxOffset */)
hh := roachpb.Header{Timestamp: clock.Now()}
hh := roachpb.Header{Timestamp: hlc.NewClockForTesting(nil).Now()}
res, pErr := kv.SendWrappedWith(context.Background(),
s.SystemServer.DB().NonTransactionalSender(), hh, req)
if pErr != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func TestCanSendToFollower(t *testing.T) {
skip.UnderDeadlock(t, "test is flaky under deadlock+stress")

ctx := context.Background()
clock := hlc.NewClockWithSystemTimeSource(base.DefaultMaxClockOffset /* maxOffset */)
clock := hlc.NewClockWithSystemTimeSource(base.DefaultMaxClockOffset, base.DefaultMaxClockOffset)
stale := clock.Now().Add(2*expectedFollowerReadOffset.Nanoseconds(), 0)
current := clock.Now()
future := clock.Now().Add(2*clock.MaxOffset().Nanoseconds(), 0)
Expand Down Expand Up @@ -514,7 +514,7 @@ func TestOracle(t *testing.T) {
ctx := context.Background()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
clock := hlc.NewClockWithSystemTimeSource(base.DefaultMaxClockOffset /* maxOffset */)
clock := hlc.NewClockWithSystemTimeSource(base.DefaultMaxClockOffset, base.DefaultMaxClockOffset)
stale := clock.Now().Add(2*expectedFollowerReadOffset.Nanoseconds(), 0)
current := clock.Now()
future := clock.Now().Add(2*clock.MaxOffset().Nanoseconds(), 0)
Expand Down
8 changes: 4 additions & 4 deletions pkg/ccl/kvccl/kvtenantccl/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func TestConnectorGossipSubscription(t *testing.T) {
ctx := context.Background()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
clock := hlc.NewClockWithSystemTimeSource(time.Nanosecond /* maxOffset */)
clock := hlc.NewClockForTesting(nil)
rpcContext := rpc.NewInsecureTestingContext(ctx, clock, stopper)
s, err := rpc.NewServer(rpcContext)
require.NoError(t, err)
Expand Down Expand Up @@ -358,7 +358,7 @@ func TestConnectorRangeLookup(t *testing.T) {
ctx := context.Background()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
clock := hlc.NewClockWithSystemTimeSource(time.Nanosecond /* maxOffset */)
clock := hlc.NewClockForTesting(nil)
rpcContext := rpc.NewInsecureTestingContext(ctx, clock, stopper)
s, err := rpc.NewServer(rpcContext)
require.NoError(t, err)
Expand Down Expand Up @@ -444,7 +444,7 @@ func TestConnectorRetriesUnreachable(t *testing.T) {
stopper := stop.NewStopper()
defer stopper.Stop(ctx)

clock := hlc.NewClockWithSystemTimeSource(time.Nanosecond /* maxOffset */)
clock := hlc.NewClockForTesting(nil)
rpcContext := rpc.NewInsecureTestingContext(ctx, clock, stopper)
s, err := rpc.NewServer(rpcContext)
require.NoError(t, err)
Expand Down Expand Up @@ -531,7 +531,7 @@ func TestConnectorRetriesError(t *testing.T) {
stopper := stop.NewStopper()
defer stopper.Stop(ctx)

clock := hlc.NewClockWithSystemTimeSource(time.Nanosecond /* maxOffset */)
clock := hlc.NewClockForTesting(nil)
rpcContext := rpc.NewInsecureTestingContext(ctx, clock, stopper)

// Function to create rpc server that would delegate to gossip and range lookup
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/kvccl/kvtenantccl/setting_overrides_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestConnectorSettingOverrides(t *testing.T) {
ctx := context.Background()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
clock := hlc.NewClockWithSystemTimeSource(time.Nanosecond /* maxOffset */)
clock := hlc.NewClockForTesting(nil)
rpcContext := rpc.NewInsecureTestingContext(ctx, clock, stopper)
s, err := rpc.NewServer(rpcContext)
require.NoError(t, err)
Expand Down
24 changes: 12 additions & 12 deletions pkg/ccl/oidcccl/authentication_oidc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,12 @@ func TestOIDCBadRequestIfDisabled(t *testing.T) {
newRPCContext := func(cfg *base.Config) *rpc.Context {
return rpc.NewContext(ctx,
rpc.ContextOptions{
TenantID: roachpb.SystemTenantID,
Config: cfg,
Clock: &timeutil.DefaultTimeSource{},
MaxOffset: 1,
Stopper: s.Stopper(),
Settings: s.ClusterSettings(),
TenantID: roachpb.SystemTenantID,
Config: cfg,
Clock: &timeutil.DefaultTimeSource{},
ToleratedOffset: 1,
Stopper: s.Stopper(),
Settings: s.ClusterSettings(),

ClientOnly: true,
})
Expand Down Expand Up @@ -90,12 +90,12 @@ func TestOIDCEnabled(t *testing.T) {

newRPCContext := func(cfg *base.Config) *rpc.Context {
return rpc.NewContext(ctx, rpc.ContextOptions{
TenantID: roachpb.SystemTenantID,
Config: cfg,
Clock: &timeutil.DefaultTimeSource{},
MaxOffset: 1,
Stopper: s.Stopper(),
Settings: s.ClusterSettings(),
TenantID: roachpb.SystemTenantID,
Config: cfg,
Clock: &timeutil.DefaultTimeSource{},
ToleratedOffset: 1,
Stopper: s.Stopper(),
Settings: s.ClusterSettings(),

ClientOnly: true,
})
Expand Down
Loading

0 comments on commit 18c5a38

Please sign in to comment.