Skip to content

Commit

Permalink
server: add a configuration to enable GC of system.rangelog
Browse files Browse the repository at this point in the history
system.rangelog table currently grows unboundedly. The rate of
growth is slow (as long as there is no replica rebalancing
thrashing), but it can still become a problem in long running
clusters.

This commit adds cluster settings to specify interval and TTL
for rows in system.rangelog.
By default, GC of system.rangelog is disabled.

Fixes cockroachdb#21260

Release note: Add configuration to enable GC of system.rangelog
  • Loading branch information
Vijay Karthik committed Oct 3, 2018
1 parent 8e130b7 commit 1ad4130
Show file tree
Hide file tree
Showing 5 changed files with 282 additions and 0 deletions.
2 changes: 2 additions & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@
<tr><td><code>server.failed_reservation_timeout</code></td><td>duration</td><td><code>5s</code></td><td>the amount of time to consider the store throttled for up-replication after a failed reservation call</td></tr>
<tr><td><code>server.heap_profile.max_profiles</code></td><td>integer</td><td><code>5</code></td><td>maximum number of profiles to be kept. Profiles with lower score are GC'ed, but latest profile is always kept</td></tr>
<tr><td><code>server.heap_profile.system_memory_threshold_fraction</code></td><td>float</td><td><code>0.85</code></td><td>fraction of system memory beyond which if Rss increases, then heap profile is triggered</td></tr>
<tr><td><code>server.rangelog.gc_interval</code></td><td>duration</td><td><code>30m0s</code></td><td>interval for running gc on rangelog. If storage.rangelog.ttl is non zero,range log entries older than server.rangelog.ttl are deleted</td></tr>
<tr><td><code>server.rangelog.ttl</code></td><td>duration</td><td><code>0s</code></td><td>if non zero, range log entries older than this duration are deleted periodically based on storage.rangelog.gc_interval</td></tr>
<tr><td><code>server.remote_debugging.mode</code></td><td>string</td><td><code>local</code></td><td>set to enable remote debugging, localhost-only or disable (any, local, off)</td></tr>
<tr><td><code>server.shutdown.drain_wait</code></td><td>duration</td><td><code>0s</code></td><td>the amount of time a server waits in an unready state before proceeding with the rest of the shutdown process</td></tr>
<tr><td><code>server.shutdown.query_wait</code></td><td>duration</td><td><code>10s</code></td><td>the server will wait for at least this amount of time for active queries to finish</td></tr>
Expand Down
106 changes: 106 additions & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"google.golang.org/grpc"

"github.com/cockroachdb/cmux"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/internal/client"
Expand Down Expand Up @@ -83,6 +84,11 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/uuid"
)

const (
// defaultRangeLogGCInterval is the default interval to run gc on rangelog.
defaultRangeLogGCInteval = 30 * time.Minute
)

var (
// Allocation pool for gzipResponseWriters.
gzipResponseWriterPool sync.Pool
Expand Down Expand Up @@ -120,6 +126,25 @@ var (
"feature.",
0,
)

// RangeLogTTL is the TTL for rows in system.rangelog. If non zero, range log
// entries are periodically garbage collectd. The period is given by
// server.rangelog.gc_interval
RangeLogTTL = settings.RegisterDurationSetting(
"server.rangelog.ttl",
"if non zero, range log entries older than this duration are deleted periodically "+
"based on storage.rangelog.gc_interval",
0,
)

// RangeLogGCInterval is the interval between subsequent runs of gc on
// system.rangelog.
RangeLogGCInterval = settings.RegisterDurationSetting(
"server.rangelog.gc_interval",
"interval for running gc on rangelog. If storage.rangelog.ttl is non zero,"+
"range log entries older than server.rangelog.ttl are deleted",
defaultRangeLogGCInteval,
)
)

// TODO(peter): Until go1.11, ServeMux.ServeHTTP was not safe to call
Expand Down Expand Up @@ -1037,6 +1062,85 @@ func (s *Server) startPersistingHLCUpperBound(
)
}

// gcRangeLog deletes entries in system.rangelog older than the given
// cutoffTimestamp
func (s *Server) gcRangeLog(ctx context.Context, cutoffTimestamp time.Time) (int, error) {
const deleteStmt = `DELETE FROM system.rangelog WHERE timestamp <= $1 LIMIT 1000`
var totalRowsAffected int
var rowsAffected int
var err error
for {
_ = s.db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
rowsAffected, err = s.internalExecutor.Exec(
ctx,
"rangelog-gc",
txn,
deleteStmt,
cutoffTimestamp,
)
return err
})
totalRowsAffected += rowsAffected
if err != nil {
return totalRowsAffected, err
}
if rowsAffected == 0 {
return totalRowsAffected, nil
}
}
}

// startRangeLogGC starts a worker which periodically GCs system.rangelog.
// The period is controlled by server.rangelog.gc_interval and the TTL is
// controlled by server.rangelog.ttl
func (s *Server) startRangeLogGC(ctx context.Context) {
s.stopper.RunWorker(ctx, func(ctx context.Context) {
intervalChangeCh := make(chan time.Duration)
interval := RangeLogGCInterval.Get(&s.cfg.Settings.SV)
RangeLogGCInterval.SetOnChange(&s.cfg.Settings.SV, func() {
intervalChangeCh <- RangeLogGCInterval.Get(&s.cfg.Settings.SV)
})
t := time.NewTimer(interval)
defer t.Stop()
for {
select {
case interval = <-intervalChangeCh:
if !t.Stop() {
<-t.C
}
t.Reset(interval)
case <-t.C:
ttl := RangeLogTTL.Get(&s.cfg.Settings.SV)
if ttl > 0 {
cutoffTimestamp := timeutil.Unix(0, s.db.Clock().PhysicalNow()-int64(ttl))
if rowsAffected, err := s.gcRangeLog(ctx, cutoffTimestamp); err != nil {
log.Errorf(
ctx,
"error garbage collecting rangelog after garbage collecting %d rows: %v",
rowsAffected,
err,
)
} else {
log.Infof(ctx, "garbage collected %d rows from rangelog", rowsAffected)
}
}

if storeKnobs, ok := s.cfg.TestingKnobs.Store.(*storage.StoreTestingKnobs); ok && storeKnobs.RangelogGCDone != nil {
select {
case storeKnobs.RangelogGCDone <- struct{}{}:
case <-s.stopper.ShouldStop():
// Test has finished
return
}
}
t.Reset(interval)
case <-s.stopper.ShouldStop():
return
}
}
})
}

// Start starts the server on the specified port, starts gossip and initializes
// the node using the engines from the server's context. This is complex since
// it sets up the listeners and the associated port muxing, but especially since
Expand Down Expand Up @@ -1679,6 +1783,8 @@ func (s *Server) Start(ctx context.Context) error {
})
}

s.startRangeLogGC(ctx)

// Record that this node joined the cluster in the event log. Since this
// executes a SQL query, this must be done after the SQL layer is ready.
s.node.recordJoinEvent()
Expand Down
6 changes: 6 additions & 0 deletions pkg/server/testserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -827,6 +827,12 @@ func (ts *TestServer) ExecutorConfig() interface{} {
return *ts.execCfg
}

// GCRangeLog deletes rows older than the given cutoffTimestamp from
// system.rangelog
func (ts *TestServer) GCRangeLog(ctx context.Context, cutoffTimestamp time.Time) (int, error) {
return ts.gcRangeLog(ctx, cutoffTimestamp)
}

type testServerFactoryImpl struct{}

// TestServerFactory can be passed to serverutils.InitTestServerFactory
Expand Down
166 changes: 166 additions & 0 deletions pkg/storage/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ import (
"encoding/json"
"net/url"
"testing"
"time"

_ "github.com/lib/pq"
"github.com/stretchr/testify/assert"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/internal/client"
Expand Down Expand Up @@ -395,3 +397,167 @@ func TestLogRebalances(t *testing.T) {
t.Errorf("expected %d RemoveReplica events logged, found %d", e, a)
}
}

func TestLogGC(t *testing.T) {
defer leaktest.AfterTest(t)()
a := assert.New(t)
s, db, kvDB := serverutils.StartServer(t, base.TestServerArgs{})
ts := s.(*server.TestServer)
ctx := context.Background()
defer s.Stopper().Stop(ctx)
const testRangeID = 10001
store, pErr := ts.Stores().GetStore(ts.GetFirstStoreID())
if pErr != nil {
t.Fatal(pErr)
}

rangeLogRowCount := func() int {
var count int
err := db.QueryRowContext(ctx,
`SELECT count(*) FROM system.rangelog WHERE "rangeID" = $1`,
testRangeID,
).Scan(&count)
if err != nil {
t.Fatal(err)
}
return count
}

rangeLogMaxTS := func() time.Time {
var time time.Time
err := db.QueryRowContext(ctx,
`SELECT timestamp FROM system.rangelog WHERE "rangeID" = $1 ORDER by timestamp DESC LIMIT 1`,
testRangeID,
).Scan(&time)
if err != nil {
t.Fatal(err)
}
return time
}

logEvents := func(count int) {
for i := 0; i < count; i++ {
a.NoError(kvDB.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
return store.LogReplicaChangeTest(
ctx,
txn,
roachpb.ADD_REPLICA,
roachpb.ReplicaDescriptor{
NodeID: 1,
StoreID: 1,
},
roachpb.RangeDescriptor{
RangeID: testRangeID,
},
storage.ReasonUnknown,
"", // details
)
}))
}
}

// Assert 0 rows before inserting any events
a.Equal(0, rangeLogRowCount())
// Insert 100 events with timestamp of up to maxTs1
logEvents(100)
a.Equal(100, rangeLogRowCount())
maxTs1 := rangeLogMaxTS()
// Insert 50 events with timestamp of up to maxTs2
logEvents(50)
a.Equal(150, rangeLogRowCount())
maxTs2 := rangeLogMaxTS()
// Insert 25 events with timestamp of up to maxTs3
logEvents(25)
a.Equal(175, rangeLogRowCount())
maxTs3 := rangeLogMaxTS()

// GC up to maxTs1
rowsGCd, err := ts.GCRangeLog(ctx, maxTs1)
a.NoError(err)
a.True(rowsGCd >= 100, "Expected rowsGCd >= 100, found %d", rowsGCd)
a.Equal(75, rangeLogRowCount())

// GC up to maxTs2
rowsGCd, err = ts.GCRangeLog(ctx, maxTs2)
a.NoError(err)
a.True(rowsGCd >= 50, "Expected rowsGCd >= 50, found %d", rowsGCd)
a.Equal(25, rangeLogRowCount())

// Insert 2000 more events
logEvents(2000)
a.Equal(2025, rangeLogRowCount())

// GC up to maxTs3
rowsGCd, err = ts.GCRangeLog(ctx, maxTs3)
a.NoError(err)
a.True(rowsGCd >= 25, "Expected rowsGCd >= 25, found %d", rowsGCd)
a.Equal(2000, rangeLogRowCount())

// GC everything
rowsGCd, err = ts.GCRangeLog(ctx, rangeLogMaxTS())
a.NoError(err)
a.True(rowsGCd >= 2000, "Expected rowsGCd >= 2000, found %d", rowsGCd)
a.Equal(0, rangeLogRowCount())
}

func TestLogGCTrigger(t *testing.T) {
defer leaktest.AfterTest(t)()
a := assert.New(t)
gcDone := make(chan struct{})
params := base.TestServerArgs{
Knobs: base.TestingKnobs{
Store: &storage.StoreTestingKnobs{
RangelogGCDone: gcDone,
},
},
}
s, db, _ := serverutils.StartServer(t, params)
ctx := context.Background()
defer s.Stopper().Stop(ctx)

rangeLogRowCount := func(ts time.Time) int {
var count int
err := db.QueryRowContext(ctx,
`SELECT count(*) FROM system.rangelog WHERE timestamp <= $1`,
ts,
).Scan(&count)
if err != nil {
t.Fatal(err)
}
return count
}

rangeLogMaxTS := func() time.Time {
var time time.Time
err := db.QueryRowContext(ctx,
`SELECT timestamp FROM system.rangelog ORDER by timestamp DESC LIMIT 1`,
).Scan(&time)
if err != nil {
t.Fatal(err)
}
return time
}

maxTs := rangeLogMaxTS()
a.NotEqual(rangeLogRowCount(maxTs), 0, "Expected non zero number of events before %v", maxTs)

server.RangeLogGCInterval.Override(&s.ClusterSettings().SV, time.Nanosecond)
// Reading gcDone once ensures that the previous gc is done
// (it could have been done long back and is waiting to send on this channel),
// and the next gc has started.
// Reading it twice guarantees that the next gc has also completed.
// Before running the assertions below one gc run has to be guaranteed.
<-gcDone
<-gcDone
a.NotEqual(
rangeLogRowCount(maxTs),
0,
"Expected non zero number of events before %v as gc is not enabled",
maxTs,
)

server.RangeLogTTL.Override(&s.ClusterSettings().SV, time.Nanosecond)
<-gcDone
<-gcDone
a.Equal(0, rangeLogRowCount(maxTs), "Expected zero events before %v after gc", maxTs)
}
2 changes: 2 additions & 0 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -853,6 +853,8 @@ type StoreTestingKnobs struct {
DisableLeaseCapacityGossip bool
// BootstrapVersion overrides the version the stores will be bootstrapped with.
BootstrapVersion *cluster.ClusterVersion
// RangelogGCDone is used to notify when rangelog GC is done
RangelogGCDone chan<- struct{}
}

var _ base.ModuleTestingKnobs = &StoreTestingKnobs{}
Expand Down

0 comments on commit 1ad4130

Please sign in to comment.