Skip to content

Commit

Permalink
Merge pull request #45669 from otan-cockroach/temp_schema_cleanup_boo…
Browse files Browse the repository at this point in the history
…galoo

sql: implement a background task for temp object cleanup
  • Loading branch information
otan authored Mar 9, 2020
2 parents ada6f08 + 34b4740 commit c28acc2
Show file tree
Hide file tree
Showing 12 changed files with 490 additions and 56 deletions.
1 change: 1 addition & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
<tr><td><code>sql.stats.automatic_collection.min_stale_rows</code></td><td>integer</td><td><code>500</code></td><td>target minimum number of stale rows per table that will trigger a statistics refresh</td></tr>
<tr><td><code>sql.stats.histogram_collection.enabled</code></td><td>boolean</td><td><code>true</code></td><td>histogram collection mode</td></tr>
<tr><td><code>sql.stats.post_events.enabled</code></td><td>boolean</td><td><code>false</code></td><td>if set, an event is logged for every CREATE STATISTICS job</td></tr>
<tr><td><code>sql.temp_object_cleaner.cleanup_interval</code></td><td>duration</td><td><code>30m0s</code></td><td>how often to clean up orphaned temporary objects</td></tr>
<tr><td><code>sql.trace.log_statement_execute</code></td><td>boolean</td><td><code>false</code></td><td>set to true to enable logging of executed statements</td></tr>
<tr><td><code>sql.trace.session_eventlog.enabled</code></td><td>boolean</td><td><code>false</code></td><td>set to true to enable session tracing. Note that enabling this may have a non-trivial negative performance impact.</td></tr>
<tr><td><code>sql.trace.txn.enable_threshold</code></td><td>duration</td><td><code>0s</code></td><td>duration beyond which all transactions are traced (set to 0 to disable)</td></tr>
Expand Down
10 changes: 1 addition & 9 deletions pkg/kv/kvserver/protectedts/ptreconcile/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -133,14 +132,7 @@ func (r *Reconciler) run(ctx context.Context, stopper *stop.Stopper) {
}

func (r *Reconciler) isMeta1Leaseholder(ctx context.Context, now hlc.Timestamp) (bool, error) {
repl, _, err := r.localStores.GetReplicaForRangeID(1)
if roachpb.IsRangeNotFoundError(err) {
return false, nil
}
if err != nil {
return false, err
}
return repl.OwnsValidLease(now), nil
return r.localStores.IsMeta1Leaseholder(now)
}

func (r *Reconciler) reconcile(ctx context.Context) {
Expand Down
13 changes: 13 additions & 0 deletions pkg/kv/kvserver/stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,19 @@ func NewStores(
}
}

// IsMeta1Leaseholder returns whether the specified stores owns
// the meta1 lease. Returns an error if any.
func (ls *Stores) IsMeta1Leaseholder(now hlc.Timestamp) (bool, error) {
repl, _, err := ls.GetReplicaForRangeID(1)
if roachpb.IsRangeNotFoundError(err) {
return false, nil
}
if err != nil {
return false, err
}
return repl.OwnsValidLease(now), nil
}

// GetStoreCount returns the number of stores this node is exporting.
func (ls *Stores) GetStoreCount() int {
var count int
Expand Down
25 changes: 18 additions & 7 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,13 +200,14 @@ type Server struct {
debug *debug.Server
// sessionRegistry can be queried for info on running SQL sessions. It is
// shared between the sql.Server and the statusServer.
sessionRegistry *sql.SessionRegistry
jobRegistry *jobs.Registry
statsRefresher *stats.Refresher
replicationReporter *reports.Reporter
engines Engines
internalMemMetrics sql.MemoryMetrics
adminMemMetrics sql.MemoryMetrics
sessionRegistry *sql.SessionRegistry
jobRegistry *jobs.Registry
statsRefresher *stats.Refresher
replicationReporter *reports.Reporter
temporaryObjectCleaner *sql.TemporaryObjectCleaner
engines Engines
internalMemMetrics sql.MemoryMetrics
adminMemMetrics sql.MemoryMetrics
// sqlMemMetrics are used to track memory usage of sql sessions.
sqlMemMetrics sql.MemoryMetrics
protectedtsProvider protectedts.Provider
Expand Down Expand Up @@ -890,6 +891,15 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {

s.debug = debug.NewServer(s.ClusterSettings(), s.pgServer.HBADebugFn())

s.temporaryObjectCleaner = sql.NewTemporaryObjectCleaner(
s.st,
s.db,
s.distSQLServer.ServerConfig.SessionBoundInternalExecutorFactory,
s.status,
s.node.stores.IsMeta1Leaseholder,
sqlExecutorTestingKnobs,
)

return s, nil
}

Expand Down Expand Up @@ -1571,6 +1581,7 @@ func (s *Server) Start(ctx context.Context) error {
time.NewTicker,
)
s.replicationReporter.Start(ctx, s.stopper)
s.temporaryObjectCleaner.Start(ctx, s.stopper)

// Cluster ID should have been determined by this point.
if s.rpcContext.ClusterID.Get() == uuid.Nil {
Expand Down
11 changes: 9 additions & 2 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -844,8 +844,15 @@ func (ex *connExecutor) closeWrapper(ctx context.Context, recovered interface{})
func (ex *connExecutor) close(ctx context.Context, closeType closeType) {
ex.sessionEventf(ctx, "finishing connExecutor")

if ex.hasCreatedTemporarySchema {
err := cleanupSessionTempObjects(ctx, ex.server, ex.sessionID)
if ex.hasCreatedTemporarySchema && !ex.server.cfg.TestingKnobs.DisableTempObjectsCleanupOnSessionExit {
ie := MakeInternalExecutor(ctx, ex.server, MemoryMetrics{}, ex.server.cfg.Settings)
err := cleanupSessionTempObjects(
ctx,
ex.server.cfg.Settings,
ex.server.cfg.DB,
&ie,
ex.sessionID,
)
if err != nil {
log.Errorf(
ctx,
Expand Down
11 changes: 11 additions & 0 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,17 @@ type ExecutorTestingKnobs struct {
// optimization). This is only called when the Executor is the one doing the
// committing.
BeforeAutoCommit func(ctx context.Context, stmt string) error

// DisableTempObjectsCleanupOnSessionExit disables cleaning up temporary schemas
// and tables when a session is closed.
DisableTempObjectsCleanupOnSessionExit bool
// TempObjectsCleanupCh replaces the time.Ticker.C channel used for scheduling
// a cleanup on every temp object in the cluster. If this is set, the job
// will now trigger when items come into this channel.
TempObjectsCleanupCh chan time.Time
// OnTempObjectsCleanupDone will trigger when the temporary objects cleanup
// job is done.
OnTempObjectsCleanupDone func()
}

// PGWireTestingKnobs contains knobs for the pgwire module.
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,9 @@ func applyOverrides(o sqlbase.InternalExecutorSessionDataOverride, sd *sessionda
if o.ApplicationName != "" {
sd.ApplicationName = o.ApplicationName
}
if o.SearchPath != nil {
sd.SearchPath = *o.SearchPath
}
}

func (ie *InternalExecutor) maybeRootSessionDataOverride(
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/sqlbase/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

package sqlbase

import "github.com/cockroachdb/cockroach/pkg/sql/sessiondata"

// InternalExecutorSessionDataOverride is used by the InternalExecutor interface
// to allow control over some of the session data.
type InternalExecutorSessionDataOverride struct {
Expand All @@ -19,4 +21,6 @@ type InternalExecutorSessionDataOverride struct {
Database string
// ApplicationName represents the application that the query runs under.
ApplicationName string
// SearchPath represents the namespaces to search in.
SearchPath *sessiondata.SearchPath
}
6 changes: 6 additions & 0 deletions pkg/sql/sqltelemetry/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ var (
CreateTempViewCounter = telemetry.GetCounterOnce("sql.schema.create_temp_view")
)

var (
// TempObjectCleanerDeletionCounter is to be incremented every time a temporary schema
// has been deleted by the temporary object cleaner.
TempObjectCleanerDeletionCounter = telemetry.GetCounterOnce("sql.schema.temp_object_cleaner.num_cleaned")
)

// SchemaChangeCreate is to be incremented every time a CREATE
// schema change was made.
func SchemaChangeCreate(typ string) telemetry.Counter {
Expand Down
Loading

0 comments on commit c28acc2

Please sign in to comment.