Skip to content

Commit

Permalink
Merge #24606
Browse files Browse the repository at this point in the history
24606: cherrypick-2.0: util/hlc: option to panic on clock jumps r=bdarnell a=vijaykarthik-rubrik

A cluster setting is added to panic on clock jumps. The existing
safeguard in forward clock jumps is in periodic inter-node heartbeats,
so there is a window where anomalies could occur.

This change adds forward jump checks to HLC (and a background
goroutine to keep lastPhysicalTime up to date).

Cherrypicks #23717

Release note (general change): Added cluster settings for HLC to
panic on clock jumps

Co-authored-by: Vijay Karthik <[email protected]>
  • Loading branch information
craig[bot] and Vijay Karthik committed Apr 9, 2018
2 parents 4cb8a1a + f25b63e commit bcd1aba
Show file tree
Hide file tree
Showing 3 changed files with 315 additions and 1 deletion.
31 changes: 30 additions & 1 deletion pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@ var (
"of the shutdown process",
0*time.Second,
)

forwardClockJumpCheckEnabled = settings.RegisterBoolSetting(
"server.clock.forward_jump_check_enabled",
"If enabled, forward clock jumps > max_offset/2 will cause a panic.",
false,
)
)

// Server is the cockroach server node.
Expand Down Expand Up @@ -155,10 +161,11 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
panic(errors.New("no tracer set in AmbientCtx"))
}

clock := hlc.NewClock(hlc.UnixNano, time.Duration(cfg.MaxOffset))
s := &Server{
st: st,
mux: http.NewServeMux(),
clock: hlc.NewClock(hlc.UnixNano, time.Duration(cfg.MaxOffset)),
clock: clock,
stopper: stopper,
cfg: cfg,
registry: metric.NewRegistry(),
Expand Down Expand Up @@ -695,6 +702,27 @@ func (s *singleListener) Addr() net.Addr {
return s.conn.LocalAddr()
}

// startMonitoringForwardClockJumps starts a background task to monitor forward
// clock jumps based on a cluster setting
func (s *Server) startMonitoringForwardClockJumps(ctx context.Context) {
forwardJumpCheckEnabled := make(chan bool, 1)
s.stopper.AddCloser(stop.CloserFn(func() { close(forwardJumpCheckEnabled) }))

forwardClockJumpCheckEnabled.SetOnChange(&s.st.SV, func() {
forwardJumpCheckEnabled <- forwardClockJumpCheckEnabled.Get(&s.st.SV)
})

if err := s.clock.StartMonitoringForwardClockJumps(
forwardJumpCheckEnabled,
time.NewTicker,
nil, /* tick callback */
); err != nil {
log.Fatal(ctx, err)
}

log.Info(ctx, "monitoring forward clock jumps based on server.clock.forward_jump_check_enabled")
}

// Start starts the server on the specified port, starts gossip and initializes
// the node using the engines from the server's context. This is complex since
// it sets up the listeners and the associated port muxing, but especially since
Expand All @@ -720,6 +748,7 @@ func (s *Server) Start(ctx context.Context) error {
ctx = s.AnnotateCtx(ctx)

startTime := timeutil.Now()
s.startMonitoringForwardClockJumps(ctx)

tlsConfig, err := s.cfg.GetServerTLSConfig()
if err != nil {
Expand Down
111 changes: 111 additions & 0 deletions pkg/util/hlc/hlc.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/pkg/errors"
)

// TODO(Tobias): Figure out if it would make sense to save some
Expand Down Expand Up @@ -69,6 +70,14 @@ type Clock struct {
// lastPhysicalTime reports the last measured physical time. This
// is used to detect clock jumps.
lastPhysicalTime int64

// forwardClockJumpCheckEnabled specifies whether to panic on forward
// clock jumps
forwardClockJumpCheckEnabled bool

// isMonitoringForwardClockJumps is a flag to ensure that only one jump monitoring
// goroutine is running per clock
isMonitoringForwardClockJumps bool
}
}

Expand Down Expand Up @@ -125,6 +134,71 @@ func NewClock(physicalClock func() int64, maxOffset time.Duration) *Clock {
}
}

// toleratedForwardClockJump is the tolerated forward jump. Jumps greater
// than the returned value will cause if panic if forward clock jump check is
// enabled
func (c *Clock) toleratedForwardClockJump() time.Duration {
return c.maxOffset / 2
}

// StartMonitoringForwardClockJumps starts a goroutine to update the clock's
// forwardClockJumpCheckEnabled based on the values pushed in
// forwardClockJumpCheckEnabledCh.
//
// This also keeps lastPhysicalTime up to date to avoid spurious jump errors.
//
// A nil channel or a value of false pushed in forwardClockJumpCheckEnabledCh
// disables checking clock jumps between two successive reads of the physical
// clock.
//
// This should only be called once per clock, and will return an error if called
// more than once
//
// tickerFn is used to create a new ticker
//
// tickCallback is called whenever maxForwardClockJumpCh or a ticker tick is
// processed
func (c *Clock) StartMonitoringForwardClockJumps(
forwardClockJumpCheckEnabledCh <-chan bool,
tickerFn func(d time.Duration) *time.Ticker,
tickCallback func(),
) error {
alreadyMonitoring := c.setMonitoringClockJump()
if alreadyMonitoring {
return errors.New("clock jumps are already being monitored")
}

go func() {
// Create a ticker object which can be used in selects.
// This ticker is turned on / off based on forwardClockJumpCheckEnabledCh
ticker := tickerFn(time.Hour)
ticker.Stop()
refreshPhysicalNowItvl := c.toleratedForwardClockJump() / 2
for {
select {
case forwardClockJumpEnabled, ok := <-forwardClockJumpCheckEnabledCh:
ticker.Stop()
if !ok {
return
}
if forwardClockJumpEnabled {
// Forward jump check is enabled. Start the ticker
ticker = tickerFn(refreshPhysicalNowItvl)
}
c.setForwardJumpCheckEnabled(forwardClockJumpEnabled)
case <-ticker.C:
c.PhysicalNow()
}

if tickCallback != nil {
tickCallback()
}
}
}()

return nil
}

// MaxOffset returns the maximal clock offset to any node in the cluster.
//
// A value of 0 means offset checking is disabled.
Expand All @@ -143,6 +217,18 @@ func (c *Clock) getPhysicalClockLocked() int64 {
c.mu.monotonicityErrorsCount++
log.Warningf(context.TODO(), "backward time jump detected (%f seconds)", float64(-interval)/1e9)
}

if c.mu.forwardClockJumpCheckEnabled {
toleratedForwardClockJump := c.toleratedForwardClockJump()
if int64(toleratedForwardClockJump) <= -interval {
log.Fatalf(
context.TODO(),
"detected forward time jump of %f seconds is not allowed with tolerance of %f seconds",
float64(-interval)/1e9,
float64(toleratedForwardClockJump)/1e9,
)
}
}
}

c.mu.lastPhysicalTime = newTime
Expand Down Expand Up @@ -251,3 +337,28 @@ func (c *Clock) UpdateAndCheckMaxOffset(rt Timestamp) (Timestamp, error) {
defer c.mu.Unlock()
return c.updateLocked(rt, false)
}

// lastPhysicalTime returns the last physical time
func (c *Clock) lastPhysicalTime() int64 {
c.mu.Lock()
defer c.mu.Unlock()
return c.mu.lastPhysicalTime
}

// setForwardJumpCheckEnabled atomically sets forwardClockJumpCheckEnabled
func (c *Clock) setForwardJumpCheckEnabled(forwardJumpCheckEnabled bool) {
c.mu.Lock()
defer c.mu.Unlock()
c.mu.forwardClockJumpCheckEnabled = forwardJumpCheckEnabled
}

// setMonitoringClockJump atomically sets isMonitoringForwardClockJumps to true and
// returns the old value. This is used to ensure that only one monitoring
// goroutine is launched
func (c *Clock) setMonitoringClockJump() bool {
c.mu.Lock()
defer c.mu.Unlock()
isMonitoring := c.mu.isMonitoringForwardClockJumps
c.mu.isMonitoringForwardClockJumps = true
return isMonitoring
}
174 changes: 174 additions & 0 deletions pkg/util/hlc/hlc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@ package hlc
import (
"context"
"fmt"
"os"
"regexp"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/stretchr/testify/assert"
)

type Event uint8
Expand Down Expand Up @@ -98,6 +102,176 @@ func TestHLCEqual(t *testing.T) {
}
}

// isErrSimilar returns true of the expected error is similar to the
// actual error
func isErrSimilar(expected *regexp.Regexp, actual error) bool {
if actual == nil {
return expected == nil
}
// actual != nil
return expected != nil && expected.FindString(actual.Error()) != ""
}

func TestHLCPhysicalClockJump(t *testing.T) {
var fatal bool
defer log.SetExitFunc(os.Exit)
log.SetExitFunc(func(r int) {
if r != 0 {
fatal = true
}
})

testCases := []struct {
name string
actualJump time.Duration
maxOffset time.Duration
isFatal bool
}{
{
name: "small forward jump",
actualJump: 50 * time.Millisecond,
maxOffset: 500 * time.Millisecond,
isFatal: false,
},
{
name: "half max offset jump",
actualJump: 250 * time.Millisecond,
maxOffset: 500 * time.Millisecond,
isFatal: true,
},
{
name: "large forward jump",
actualJump: 400 * time.Millisecond,
maxOffset: 500 * time.Millisecond,
isFatal: true,
},
{
name: "large forward jump large thresh",
actualJump: 400 * time.Millisecond,
maxOffset: 900 * time.Millisecond,
isFatal: false,
},
{
name: "small backward jump",
actualJump: -40 * time.Millisecond,
maxOffset: 500 * time.Millisecond,
isFatal: false,
},
{
name: "large backward jump",
actualJump: -700 * time.Millisecond,
maxOffset: 500 * time.Millisecond,
isFatal: false,
},
{
name: "large backward jump large thresh",
actualJump: -700 * time.Millisecond,
maxOffset: 900 * time.Millisecond,
isFatal: false,
},
}

for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
a := assert.New(t)

m := NewManualClock(1)
c := NewClock(m.UnixNano, test.maxOffset)
var tickerDuration time.Duration
tickerCh := make(chan time.Time)
tickProcessedCh := make(chan struct{})
forwardJumpCheckEnabledCh := make(chan bool, 1)
defer close(forwardJumpCheckEnabledCh)

if err := c.StartMonitoringForwardClockJumps(
forwardJumpCheckEnabledCh,
func(d time.Duration) *time.Ticker {
tickerDuration = d
ticker := time.NewTicker(d)
ticker.Stop()
ticker.C = tickerCh
return ticker
},
func() {
tickProcessedCh <- struct{}{}
},
); err != nil {
t.Error(err)
return
}

if err := c.StartMonitoringForwardClockJumps(
forwardJumpCheckEnabledCh,
time.NewTicker,
nil, /* tick callback */
); !isErrSimilar(regexp.MustCompile("already being monitored"), err) {
t.Error("expected an error when starting monitor goroutine twice")
}

fatal = false
t0 := c.Now()
a.Equal(false, fatal)

// forward jump check should be disabled unless set to true. This should
// not fatal even though it is a large jump
m.Increment(int64(test.maxOffset))
fatal = false
t1 := c.Now()
a.True(t0.Less(t1), fmt.Sprintf("expected %+v < %+v", t0, t1))
a.Equal(false, fatal)

forwardJumpCheckEnabledCh <- true
<-tickProcessedCh

m.Increment(int64(test.actualJump))
tickerCh <- timeutil.Now()
<-tickProcessedCh

fatal = false
t2 := c.Now()
a.True(t1.Less(t2), fmt.Sprintf("expected %+v < %+v", t1, t2))
// This should not fatal as tickerCh has ticked
a.Equal(false, fatal)
// After ticker ticks, last physical time should be equal to physical now
lastPhysicalTime := c.lastPhysicalTime()
physicalNow := c.PhysicalNow()
a.Equal(lastPhysicalTime, physicalNow)

// Potentially a fatal jump depending on the test case
fatal = false
m.Increment(int64(test.actualJump))
t3 := c.Now()
a.True(t2.Less(t3), fmt.Sprintf("expected %+v < %+v", t2, t3))
a.Equal(test.isFatal, fatal)

a.True(
tickerDuration <= test.maxOffset,
fmt.Sprintf(
"ticker duration %+v should be less than max jump %+v",
tickerDuration,
test.maxOffset,
),
)

// A jump by maxOffset is surely fatal
fatal = false
m.Increment(int64(test.maxOffset))
t4 := c.Now()
a.True(t3.Less(t4), fmt.Sprintf("expected %+v < %+v", t3, t4))
a.Equal(true, fatal)

// disable forward jump check
forwardJumpCheckEnabledCh <- false
<-tickProcessedCh
fatal = false
m.Increment(int64(test.actualJump))
t5 := c.Now()
a.True(t4.Less(t5), fmt.Sprintf("expected %+v < %+v", t4, t5))
a.Equal(false, fatal)
})
}
}

// TestHLCClock performs a complete test of all basic phenomena,
// including backward jumps in local physical time and clock offset.
func TestHLCClock(t *testing.T) {
Expand Down

0 comments on commit bcd1aba

Please sign in to comment.