Skip to content

Commit

Permalink
[wip] gossip: use ptp clock when configured
Browse files Browse the repository at this point in the history
Before this commit we saw "info not fresh" errors related to gossip
while running the ptp roachtest introduced in #129123.  Since gossip
takes walltimes of the local clock and shoves them into HLC timestamps,
it's likely that there are places where we might end up comparing these
to timestamps sourced from a ptp clock, if the system is configured with
one. If these two clocks don't agree, this is problematic.
  • Loading branch information
tbg committed Sep 11, 2024
1 parent c481f42 commit d4af16a
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 25 deletions.
5 changes: 3 additions & 2 deletions pkg/gossip/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,9 +294,10 @@ func New(
stopper *stop.Stopper,
registry *metric.Registry,
locality roachpb.Locality,
now func() int64,
) *Gossip {
g := &Gossip{
server: newServer(ambient, clusterID, nodeID, stopper, registry),
server: newServer(ambient, clusterID, nodeID, stopper, registry, now),
Connected: make(chan struct{}),
outgoing: makeNodeSet(minPeers, metric.NewGauge(MetaConnectionsOutgoingGauge)),
bootstrapping: map[string]struct{}{},
Expand Down Expand Up @@ -339,7 +340,7 @@ func NewTestWithLocality(
n := &base.NodeIDContainer{}
var ac log.AmbientContext
ac.AddLogTag("n", n)
gossip := New(ac, c, n, stopper, registry, locality)
gossip := New(ac, c, n, stopper, registry, locality, func() int64 { return timeutil.Now().UnixNano() })
if nodeID != 0 {
n.Set(context.TODO(), nodeID)
}
Expand Down
24 changes: 14 additions & 10 deletions pkg/gossip/infostore.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ type callback struct {
type infoStore struct {
log.AmbientContext

now func() int64

nodeID *base.NodeIDContainer
stopper *stop.Stopper
metrics Metrics
Expand Down Expand Up @@ -88,16 +90,16 @@ var errNotFresh = errors.New("info not fresh")
// newly created value in the event one is created within the same
// nanosecond. Really unlikely except for the case of unittests, but
// better safe than sorry.
func monotonicUnixNano() int64 {
func monotonicUnixNano(now func() int64) int64 {
monoTime.Lock()
defer monoTime.Unlock()

now := timeutil.Now().UnixNano()
if now <= monoTime.last {
now = monoTime.last + 1
wt := now()
if wt <= monoTime.last {
wt = monoTime.last + 1
}
monoTime.last = now
return now
monoTime.last = wt
return wt
}

// ratchetMonotonic increases the monotonic clock to be at least v. Used to
Expand Down Expand Up @@ -163,9 +165,11 @@ func newInfoStore(
nodeAddr util.UnresolvedAddr,
stopper *stop.Stopper,
metrics Metrics,
now func() int64,
) *infoStore {
is := &infoStore{
AmbientContext: ambient,
now: now,
nodeID: nodeID,
stopper: stopper,
metrics: metrics,
Expand Down Expand Up @@ -209,7 +213,7 @@ func (is *infoStore) newInfo(val []byte, ttl time.Duration) *Info {
if nodeID == 0 {
panic("gossip infostore's NodeID is 0")
}
now := monotonicUnixNano()
now := monotonicUnixNano(is.now)
ttlStamp := now + int64(ttl)
if ttl == 0 {
ttlStamp = math.MaxInt64
Expand All @@ -227,7 +231,7 @@ func (is *infoStore) newInfo(val []byte, ttl time.Duration) *Info {
func (is *infoStore) getInfo(key string) *Info {
if info, ok := is.Infos[key]; ok {
// Check TTL and ignore if too old.
if !info.expired(monotonicUnixNano()) {
if !info.expired(monotonicUnixNano(is.now)) {
return info
}
}
Expand All @@ -253,7 +257,7 @@ func (is *infoStore) addInfo(key string, i *Info) error {
}
if i.OrigStamp == 0 {
i.Value.InitChecksum([]byte(key))
i.OrigStamp = monotonicUnixNano()
i.OrigStamp = monotonicUnixNano(is.now)
if highWaterStamp, ok := is.highWaterStamps[i.NodeID]; ok && highWaterStamp >= i.OrigStamp {
// Report both timestamps in the crash.
log.Fatalf(context.Background(),
Expand Down Expand Up @@ -397,7 +401,7 @@ func (is *infoStore) runCallbacks(key string, content roachpb.Value, callbacks .
// infoStore. If it is specified as false, the method will ignore expired infos
// without deleting them or modifying the infoStore.
func (is *infoStore) visitInfos(visitInfo func(string, *Info) error, deleteExpired bool) error {
now := monotonicUnixNano()
now := monotonicUnixNano(is.now)

if visitInfo != nil {
for k, i := range is.Infos {
Expand Down
11 changes: 8 additions & 3 deletions pkg/gossip/infostore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/stretchr/testify/require"
)

Expand All @@ -37,7 +38,10 @@ func newTestInfoStore() (*infoStore, *stop.Stopper) {
stopper := stop.NewStopper()
nc := &base.NodeIDContainer{}
nc.Set(context.Background(), 1)
is := newInfoStore(log.MakeTestingAmbientCtxWithNewTracer(), nc, emptyAddr, stopper, makeMetrics())
is := newInfoStore(
log.MakeTestingAmbientCtxWithNewTracer(), nc, emptyAddr, stopper, makeMetrics(),
func() int64 { return timeutil.Now().UnixNano() },
)
return is, stopper
}

Expand Down Expand Up @@ -249,11 +253,12 @@ func TestCombineInfosRatchetMonotonic(t *testing.T) {
is, stopper := newTestInfoStore()
defer stopper.Stop(context.Background())

clock := func() int64 { return timeutil.Now().UnixNano() }
// Generate an info with a timestamp in the future.
info := &Info{
NodeID: is.nodeID.Get(),
TTLStamp: math.MaxInt64,
OrigStamp: monotonicUnixNano() + int64(time.Hour),
OrigStamp: monotonicUnixNano(clock) + int64(time.Hour),
}
if !local {
info.NodeID++
Expand All @@ -280,7 +285,7 @@ func TestCombineInfosRatchetMonotonic(t *testing.T) {
var expectedLast int64
if local {
expectedLast = info.OrigStamp
if now := monotonicUnixNano(); now <= last {
if now := monotonicUnixNano(clock); now <= last {
t.Fatalf("expected mono-time to increase: %d <= %d", now, last)
}
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/gossip/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func newServer(
nodeID *base.NodeIDContainer,
stopper *stop.Stopper,
registry *metric.Registry,
now func() int64,
) *server {
s := &server{
AmbientContext: ambient,
Expand All @@ -88,7 +89,7 @@ func newServer(
serverMetrics: makeMetrics(),
}

s.mu.is = newInfoStore(s.AmbientContext, nodeID, util.UnresolvedAddr{}, stopper, s.nodeMetrics)
s.mu.is = newInfoStore(s.AmbientContext, nodeID, util.UnresolvedAddr{}, stopper, s.nodeMetrics, now)
s.mu.incoming = makeNodeSet(minPeers, metric.NewGauge(MetaConnectionsIncomingGauge))
s.mu.nodeMap = make(map[util.UnresolvedAddr]serverInfo)
s.ready.Store(make(chan struct{}))
Expand Down
9 changes: 1 addition & 8 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,14 +316,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (serverctl.ServerStartupInterf

nodeTombStorage, decommissionCheck := getPingCheckDecommissionFn(engines)

g := gossip.New(
cfg.AmbientCtx,
cfg.ClusterIDContainer,
nodeIDContainer,
stopper,
nodeRegistry,
cfg.Locality,
)
g := gossip.New(cfg.AmbientCtx, cfg.ClusterIDContainer, nodeIDContainer, stopper, nodeRegistry, cfg.Locality, clock.PhysicalNow)

tenantCapabilitiesTestingKnobs, _ := cfg.TestingKnobs.TenantCapabilitiesTestingKnobs.(*tenantcapabilities.TestingKnobs)
authorizer := tenantcapabilitiesauthorizer.New(cfg.Settings, tenantCapabilitiesTestingKnobs)
Expand Down
2 changes: 1 addition & 1 deletion pkg/testutils/localtestcluster/local_test_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func (ltc *LocalTestCluster) Start(t testing.TB, initFactory InitFactoryFn) {

cfg.RPCContext.NodeID.Set(ctx, nodeID)
clusterID := cfg.RPCContext.StorageClusterID
ltc.Gossip = gossip.New(ambient, clusterID, nc, ltc.stopper, metric.NewRegistry(), roachpb.Locality{})
ltc.Gossip = gossip.New(ambient, clusterID, nc, ltc.stopper, metric.NewRegistry(), roachpb.Locality{}, ltc.Clock.PhysicalNow)
var err error
ltc.Eng, err = storage.Open(
ctx,
Expand Down

0 comments on commit d4af16a

Please sign in to comment.