Skip to content

Commit

Permalink
Merge #103582
Browse files Browse the repository at this point in the history
103582: server: call VerifyClockOffset directly r=aliher1911 a=tbg

Extracted from #99191.

----

There was this weird setup where we were hiding it in a callback.
Let's just call it directly.

This was masking that fatalling on the result is actually turned off except
when starting a `Server` (i.e. outside of pure unit tests). Now there's
an explicit variable on the options to opt into the fatal error.

Now we also call it only when we want to: we didn't actually want to call it
when dealing with blocking requests. It didn't hurt either since the blocking
requests didn't change the state of the tracker, but still - now we're only
checking when something has changed which is better.

Epic: None
Release note: None


Co-authored-by: Tobias Grieger <[email protected]>
  • Loading branch information
craig[bot] and tbg committed May 19, 2023
2 parents 080bf1a + 97a25b7 commit f2ff9b4
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 78 deletions.
21 changes: 10 additions & 11 deletions pkg/rpc/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,6 @@ type Context struct {

heartbeatInterval time.Duration
heartbeatTimeout time.Duration
HeartbeatCB func()

rpcCompression bool

Expand Down Expand Up @@ -483,12 +482,13 @@ func (c connKey) SafeFormat(p redact.SafePrinter, _ rune) {
// ContextOptions are passed to NewContext to set up a new *Context.
// All pointer fields and TenantID are required.
type ContextOptions struct {
TenantID roachpb.TenantID
Config *base.Config
Clock hlc.WallClock
ToleratedOffset time.Duration
Stopper *stop.Stopper
Settings *cluster.Settings
TenantID roachpb.TenantID
Config *base.Config
Clock hlc.WallClock
ToleratedOffset time.Duration
FatalOnOffsetViolation bool
Stopper *stop.Stopper
Settings *cluster.Settings
// OnIncomingPing is called when handling a PingRequest, after
// preliminary checks but before recording clock offset information.
// It can inject an error or modify the response.
Expand Down Expand Up @@ -2546,10 +2546,9 @@ func (rpcCtx *Context) runHeartbeat(
request.Offset.Offset = remoteTimeNow.Sub(receiveTime).Nanoseconds()
}
rpcCtx.RemoteClocks.UpdateOffset(ctx, conn.remoteNodeID, request.Offset, pingDuration)
}

if cb := rpcCtx.HeartbeatCB; cb != nil {
cb()
if err := rpcCtx.RemoteClocks.VerifyClockOffset(ctx); err != nil && rpcCtx.FatalOnOffsetViolation {
log.Ops.Fatalf(ctx, "%v", err)
}
}

return nil
Expand Down
53 changes: 0 additions & 53 deletions pkg/rpc/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,59 +112,6 @@ func newTestContext(
})
}

func TestHeartbeatCB(t *testing.T) {
defer leaktest.AfterTest(t)()

testutils.RunTrueAndFalse(t, "compression", func(t *testing.T, compression bool) {
stopper := stop.NewStopper()
defer stopper.Stop(context.Background())

// Shared cluster ID by all RPC peers (this ensures that the peers
// don't talk to servers from unrelated tests by accident).
clusterID := uuid.MakeV4()

clock := timeutil.NewManualTime(timeutil.Unix(0, 20))
maxOffset := time.Duration(0)
serverCtx := newTestContext(clusterID, clock, maxOffset, stopper)
serverCtx.rpcCompression = compression
const serverNodeID = 1
serverCtx.NodeID.Set(context.Background(), serverNodeID)
s := newTestServer(t, serverCtx)
RegisterHeartbeatServer(s, &HeartbeatService{
clock: clock,
remoteClockMonitor: serverCtx.RemoteClocks,
clusterID: serverCtx.StorageClusterID,
nodeID: serverCtx.NodeID,
version: serverCtx.Settings.Version,
})

ln, err := netutil.ListenAndServeGRPC(serverCtx.Stopper, s, util.TestAddr)
if err != nil {
t.Fatal(err)
}
remoteAddr := ln.Addr().String()

// Clocks don't matter in this test.
clientCtx := newTestContext(clusterID, clock, maxOffset, stopper)
clientCtx.rpcCompression = compression

var once sync.Once
ch := make(chan struct{})

clientCtx.HeartbeatCB = func() {
once.Do(func() {
close(ch)
})
}

if _, err := clientCtx.GRPCDialNode(remoteAddr, serverNodeID, DefaultClass).Connect(context.Background()); err != nil {
t.Fatal(err)
}

<-ch
})
}

// TestPingInterceptors checks that OnOutgoingPing and OnIncomingPing can inject errors.
func TestPingInterceptors(t *testing.T) {
defer leaktest.AfterTest(t)()
Expand Down
24 changes: 10 additions & 14 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,15 +304,16 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
tenantCapabilitiesTestingKnobs, _ := cfg.TestingKnobs.TenantCapabilitiesTestingKnobs.(*tenantcapabilities.TestingKnobs)
authorizer := tenantcapabilitiesauthorizer.New(cfg.Settings, tenantCapabilitiesTestingKnobs)
rpcCtxOpts := rpc.ContextOptions{
TenantID: roachpb.SystemTenantID,
UseNodeAuth: true,
NodeID: cfg.IDContainer,
StorageClusterID: cfg.ClusterIDContainer,
Config: cfg.Config,
Clock: clock.WallClock(),
ToleratedOffset: clock.ToleratedOffset(),
Stopper: stopper,
Settings: cfg.Settings,
TenantID: roachpb.SystemTenantID,
UseNodeAuth: true,
NodeID: cfg.IDContainer,
StorageClusterID: cfg.ClusterIDContainer,
Config: cfg.Config,
Clock: clock.WallClock(),
ToleratedOffset: clock.ToleratedOffset(),
FatalOnOffsetViolation: true,
Stopper: stopper,
Settings: cfg.Settings,
OnOutgoingPing: func(ctx context.Context, req *rpc.PingRequest) error {
// Outgoing ping will block requests with codes.FailedPrecondition to
// notify caller that this replica is decommissioned but others could
Expand Down Expand Up @@ -344,11 +345,6 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
return rpcContext.VerifyDialback(ctx, req, resp, cfg.Locality)
}

rpcContext.HeartbeatCB = func() {
if err := rpcContext.RemoteClocks.VerifyClockOffset(ctx); err != nil {
log.Ops.Fatalf(ctx, "%v", err)
}
}
registry.AddMetricStruct(rpcContext.Metrics())

// Attempt to load TLS configs right away, failures are permanent.
Expand Down

0 comments on commit f2ff9b4

Please sign in to comment.