Skip to content

Commit

Permalink
sql: implement a background task for temp object cleanup
Browse files Browse the repository at this point in the history
This PR introduces a `TempObjectCleaner`, which is a background task
that runs every 30mins on every node, but will only be executed by the
node that has the meta1 leaseholder. This task will cleanup dangling
temporary schemas that could not have been cleaned up by the connection.
The job also comes with logging and metrics. A simple test mocking the
ticking mechanism has also been included.

Also refactored the existing temporary schema deletion to use ExecEx,
which overrides the SearchPath / User executing the deletion at a lower
layer rather than using `SetSessionExecutor`.

Release note (sql change): Introduced a temporary table cleanup job that
runs once every 30mins per cluster that will remove any temporary
schemas and their related objects that did not get removed cleanly when
the connection closed.
  • Loading branch information
otan committed Mar 4, 2020
1 parent fe495ee commit 15cbf16
Show file tree
Hide file tree
Showing 10 changed files with 432 additions and 56 deletions.
10 changes: 1 addition & 9 deletions pkg/kv/kvserver/protectedts/ptreconcile/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -133,14 +132,7 @@ func (r *Reconciler) run(ctx context.Context, stopper *stop.Stopper) {
}

func (r *Reconciler) isMeta1Leaseholder(ctx context.Context, now hlc.Timestamp) (bool, error) {
repl, _, err := r.localStores.GetReplicaForRangeID(1)
if roachpb.IsRangeNotFoundError(err) {
return false, nil
}
if err != nil {
return false, err
}
return repl.OwnsValidLease(now), nil
return r.localStores.IsMeta1Leaseholder(now)
}

func (r *Reconciler) reconcile(ctx context.Context) {
Expand Down
13 changes: 13 additions & 0 deletions pkg/kv/kvserver/stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,19 @@ func NewStores(
}
}

// IsMeta1Leaseholder returns whether the specified stores owns
// the meta1 lease. Returns an error if any.
func (ls *Stores) IsMeta1Leaseholder(now hlc.Timestamp) (bool, error) {
repl, _, err := ls.GetReplicaForRangeID(1)
if roachpb.IsRangeNotFoundError(err) {
return false, nil
}
if err != nil {
return false, err
}
return repl.OwnsValidLease(now), nil
}

// GetStoreCount returns the number of stores this node is exporting.
func (ls *Stores) GetStoreCount() int {
var count int
Expand Down
25 changes: 18 additions & 7 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,13 +200,14 @@ type Server struct {
debug *debug.Server
// sessionRegistry can be queried for info on running SQL sessions. It is
// shared between the sql.Server and the statusServer.
sessionRegistry *sql.SessionRegistry
jobRegistry *jobs.Registry
statsRefresher *stats.Refresher
replicationReporter *reports.Reporter
engines Engines
internalMemMetrics sql.MemoryMetrics
adminMemMetrics sql.MemoryMetrics
sessionRegistry *sql.SessionRegistry
jobRegistry *jobs.Registry
statsRefresher *stats.Refresher
replicationReporter *reports.Reporter
temporaryObjectCleaner *sql.TemporaryObjectCleaner
engines Engines
internalMemMetrics sql.MemoryMetrics
adminMemMetrics sql.MemoryMetrics
// sqlMemMetrics are used to track memory usage of sql sessions.
sqlMemMetrics sql.MemoryMetrics
protectedtsProvider protectedts.Provider
Expand Down Expand Up @@ -873,6 +874,15 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {

s.debug = debug.NewServer(s.ClusterSettings(), s.pgServer.HBADebugFn())

s.temporaryObjectCleaner = sql.NewTemporaryObjectCleaner(
s.st,
s.db,
s.distSQLServer.ServerConfig.SessionBoundInternalExecutorFactory,
s.status,
s.node.stores.IsMeta1Leaseholder,
sqlExecutorTestingKnobs,
)

return s, nil
}

Expand Down Expand Up @@ -1554,6 +1564,7 @@ func (s *Server) Start(ctx context.Context) error {
time.NewTicker,
)
s.replicationReporter.Start(ctx, s.stopper)
s.temporaryObjectCleaner.Start(ctx, s.stopper)

// Cluster ID should have been determined by this point.
if s.rpcContext.ClusterID.Get() == uuid.Nil {
Expand Down
11 changes: 9 additions & 2 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -844,8 +844,15 @@ func (ex *connExecutor) closeWrapper(ctx context.Context, recovered interface{})
func (ex *connExecutor) close(ctx context.Context, closeType closeType) {
ex.sessionEventf(ctx, "finishing connExecutor")

if ex.hasCreatedTemporarySchema {
err := cleanupSessionTempObjects(ctx, ex.server, ex.sessionID)
if ex.hasCreatedTemporarySchema && !ex.server.cfg.TestingKnobs.DisableTempObjectsCleanupOnSessionExit {
ie := MakeInternalExecutor(ctx, ex.server, MemoryMetrics{}, ex.server.cfg.Settings)
err := cleanupSessionTempObjects(
ctx,
ex.server.cfg.Settings,
ex.server.cfg.DB,
&ie,
ex.sessionID,
)
if err != nil {
log.Errorf(
ctx,
Expand Down
8 changes: 8 additions & 0 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,14 @@ type ExecutorTestingKnobs struct {
// optimization). This is only called when the Executor is the one doing the
// committing.
BeforeAutoCommit func(ctx context.Context, stmt string) error

// DisableTempObjectsCleanupOnSessionExit disables cleaning up temporary schemas
// and tables when a session is closed.
DisableTempObjectsCleanupOnSessionExit bool
// TempObjectsCleanupCh replaces the time.Ticker.C channel used for scheduling
// a cleanup on every temp object in the cluster. If this is set, the job
// will now trigger when items come into this channel.
TempObjectsCleanupCh chan time.Time
}

// PGWireTestingKnobs contains knobs for the pgwire module.
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,9 @@ func applyOverrides(o sqlbase.InternalExecutorSessionDataOverride, sd *sessionda
if o.ApplicationName != "" {
sd.ApplicationName = o.ApplicationName
}
if o.SearchPath != nil {
sd.SearchPath = *o.SearchPath
}
}

func (ie *InternalExecutor) maybeRootSessionDataOverride(
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/sqlbase/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

package sqlbase

import "github.com/cockroachdb/cockroach/pkg/sql/sessiondata"

// InternalExecutorSessionDataOverride is used by the InternalExecutor interface
// to allow control over some of the session data.
type InternalExecutorSessionDataOverride struct {
Expand All @@ -19,4 +21,6 @@ type InternalExecutorSessionDataOverride struct {
Database string
// ApplicationName represents the application that the query runs under.
ApplicationName string
// SearchPath represents the namespsaces to search in.
SearchPath *sessiondata.SearchPath
}
6 changes: 6 additions & 0 deletions pkg/sql/sqltelemetry/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ var (
CreateTempViewCounter = telemetry.GetCounterOnce("sql.schema.create_temp_view")
)

var (
// TempObjectCleanerDeletionCounter is to be incremented every time a temporary schema
// has been deleted by the temporary object cleaner.
TempObjectCleanerDeletionCounter = telemetry.GetCounterOnce("sql.schema.temp_object_cleaner.num_cleaned")
)

// SchemaChangeCreate is to be incremented every time a CREATE
// schema change was made.
func SchemaChangeCreate(typ string) telemetry.Counter {
Expand Down
Loading

0 comments on commit 15cbf16

Please sign in to comment.