Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
72638: catalog/lease: avoid using context.Background directly r=ajwerner a=knz

First commit from #72651
Informs #58938.

- `(*AmbientContext).AnnotateCtx()` - takes care of connecting the
  context to the tracer
- `logtags.FromContext` / `logtags.WithTags` - reproduces the logging
  tags on the child context.

Release note: None

72640: sql/types: types.EnumMetadata implements encoding.TextMarshaler interface r=chengxiong-ruan a=chengxiong-ruan

Fixes #63379

`types.EnumMetadata` needs to implement `encoding.TextMarshaler`
interface so that goto/protobuf won't panic when text marshaling
protobuf struct has child field of type `types.EnumMetadata`.
See the issue for more details why it would fail without this
fix.

Release note: None

72644: server: improve the tenant context r=RaduBerinde a=knz

All commits but the last from #72638
(Reviewers: only review last commit)

Informs #58938

72647: admission,server: context improvements r=RaduBerinde a=knz

All commits but the last 2 from  #72644
(Reviewers: only review last 2 commits)

Informs #58938

72650: streamingest: wait for all goroutines on streamIngestProcessor shutdown  r=andreimatei a=andreimatei

This processor was failing to stop and wait for some of the goroutines it
spawned, which is not nice (in particular, it's likely that those
goroutines were using a tracing span after it was finished).

Release note: None

72652: internal/sqlsmith: do not use crdb_internal.start_replication_stream r=yuzefovich a=yuzefovich

Fixes: #72633.
Fixes: #72634.

Release note: None

72661: migration: add missing leaktests r=andreimatei a=andreimatei

Release note: None

72668: roachtest: add assignment-cast related failures to pgjdbc r=RichardJCai a=rafiss

fixes #72636

Release note: None

Co-authored-by: Raphael 'kena' Poss <[email protected]>
Co-authored-by: Chengxiong Ruan <[email protected]>
Co-authored-by: Andrei Matei <[email protected]>
Co-authored-by: Yahor Yuzefovich <[email protected]>
Co-authored-by: Rafi Shamim <[email protected]>
  • Loading branch information
6 people committed Nov 11, 2021
9 parents dbad874 + ca0df0f + 4271af4 + c14ece7 + 465c8ea + 42db1dd + 3fc57f8 + ca89099 + 0049d6f commit 4300949
Show file tree
Hide file tree
Showing 23 changed files with 254 additions and 88 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/streamingccl/streamingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ go_library(
"//pkg/sql/sem/tree",
"//pkg/sql/types",
"//pkg/storage",
"//pkg/util/ctxgroup",
"//pkg/util/hlc",
"//pkg/util/log",
"//pkg/util/metric",
Expand All @@ -50,7 +51,6 @@ go_library(
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_logtags//:logtags",
"@com_github_cockroachdb_redact//:redact",
"@org_golang_x_sync//errgroup",
],
)

Expand Down
62 changes: 38 additions & 24 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"golang.org/x/sync/errgroup"
)

var minimumFlushInterval = settings.RegisterPublicDurationSettingWithExplicitUnit(
Expand Down Expand Up @@ -111,6 +111,10 @@ type streamIngestionProcessor struct {
// closePoller is used to shutdown the poller that checks the job for a
// cutover signal.
closePoller chan struct{}
// cancelMergeAndWait cancels the merging goroutines and waits for them to
// finish. It cannot be called concurrently with Next(), as it consumes from
// the merged channel.
cancelMergeAndWait func()

// mu is used to provide thread-safe read-write operations to ingestionErr
// and pollingErr.
Expand Down Expand Up @@ -234,11 +238,7 @@ func (sip *streamIngestionProcessor) Start(ctx context.Context) {
eventChs[id] = eventCh
errChs[id] = errCh
}
sip.eventCh, err = sip.merge(ctx, eventChs, errChs)
if err != nil {
sip.MoveToDraining(err)
return
}
sip.eventCh = sip.merge(ctx, eventChs, errChs)
}

// Next is part of the RowSource interface.
Expand Down Expand Up @@ -291,20 +291,26 @@ func (sip *streamIngestionProcessor) ConsumerClosed() {
}

func (sip *streamIngestionProcessor) close() {
if sip.InternalClose() {
if sip.batcher != nil {
sip.batcher.Close()
}
if sip.maxFlushRateTimer != nil {
sip.maxFlushRateTimer.Stop()
}
if sip.closePoller != nil {
close(sip.closePoller)
// Wait for the goroutine to return so that we do not access processor
// state once it has shutdown.
sip.pollingWaitGroup.Wait()
}
if sip.Closed {
return
}

if sip.batcher != nil {
sip.batcher.Close()
}
if sip.maxFlushRateTimer != nil {
sip.maxFlushRateTimer.Stop()
}
close(sip.closePoller)
// Wait for the processor goroutine to return so that we do not access
// processor state once it has shutdown.
sip.pollingWaitGroup.Wait()
// Wait for the merge goroutine.
if sip.cancelMergeAndWait != nil {
sip.cancelMergeAndWait()
}

sip.InternalClose()
}

// checkForCutoverSignal periodically loads the job progress to check for the
Expand Down Expand Up @@ -365,19 +371,27 @@ func (sip *streamIngestionProcessor) merge(
ctx context.Context,
partitionStreams map[string]chan streamingccl.Event,
errorStreams map[string]chan error,
) (chan partitionEvent, error) {
) chan partitionEvent {
merged := make(chan partitionEvent)

var g errgroup.Group
ctx, cancel := context.WithCancel(ctx)
g := ctxgroup.WithContext(ctx)

sip.cancelMergeAndWait = func() {
cancel()
// Wait until the merged channel is closed by the goroutine above.
for range merged {
}
}

for partition, eventCh := range partitionStreams {
partition := partition
eventCh := eventCh
errCh, ok := errorStreams[partition]
if !ok {
return nil, errors.Newf("could not find error channel for partition %q", partition)
log.Fatalf(ctx, "could not find error channel for partition %q", partition)
}
g.Go(func() error {
g.GoCtx(func(ctx context.Context) error {
ctxDone := ctx.Done()
for {
select {
Expand Down Expand Up @@ -412,7 +426,7 @@ func (sip *streamIngestionProcessor) merge(
close(merged)
}()

return merged, nil
return merged
}

// consumeEvents handles processing events on the merged event queue and returns
Expand Down
2 changes: 2 additions & 0 deletions pkg/cmd/roachtest/tests/pgjdbc_blocklist.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ var pgjdbcBlockList22_1 = blocklist{
"org.postgresql.test.jdbc2.ArrayTest.testSetArray[binary = REGULAR]": "26925",
"org.postgresql.test.jdbc2.ArrayTest.testSetNullArrays[binary = FORCE]": "71714",
"org.postgresql.test.jdbc2.ArrayTest.testSetNullArrays[binary = REGULAR]": "71714",
"org.postgresql.test.jdbc2.ArrayTest.testSetNull[binary = FORCE]": "71714",
"org.postgresql.test.jdbc2.ArrayTest.testSetNull[binary = REGULAR]": "71714",
"org.postgresql.test.jdbc2.ArrayTest.testSetPrimitiveArraysObjects[binary = FORCE]": "26925",
"org.postgresql.test.jdbc2.ArrayTest.testSetPrimitiveArraysObjects[binary = REGULAR]": "26925",
"org.postgresql.test.jdbc2.ArrayTest.testSetPrimitiveObjects[binary = FORCE]": "26925",
Expand Down
1 change: 1 addition & 0 deletions pkg/internal/sqlsmith/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,7 @@ var functions = func() map[tree.FunctionClass]map[oid.Oid][]function {
"crdb_internal.create_join_token",
"crdb_internal.reset_multi_region_zone_configs_for_database",
"crdb_internal.reset_index_usage_stats",
"crdb_internal.start_replication_stream",
} {
skip = skip || strings.Contains(def.Name, substr)
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,9 @@ func (r *Registry) ID() base.SQLInstanceID {
// makeCtx returns a new context from r's ambient context and an associated
// cancel func.
func (r *Registry) makeCtx() (context.Context, func()) {
return context.WithCancel(r.ac.AnnotateCtx(r.serverCtx))
ctx := r.ac.AnnotateCtx(context.Background())
ctx = logtags.WithTags(ctx, logtags.FromContext(r.serverCtx))
return context.WithCancel(ctx)
}

// MakeJobID generates a new job ID.
Expand Down
2 changes: 2 additions & 0 deletions pkg/migration/migrationcluster/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,5 @@ func TestMain(m *testing.M) {
serverutils.InitTestClusterFactory(testcluster.TestClusterFactory)
os.Exit(m.Run())
}

//go:generate ../../util/leaktest/add-leaktest.sh *_test.go
2 changes: 2 additions & 0 deletions pkg/migration/migrationmanager/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,5 @@ func TestMain(m *testing.M) {
serverutils.InitTestClusterFactory(testcluster.TestClusterFactory)
os.Exit(m.Run())
}

//go:generate ../../util/leaktest/add-leaktest.sh *_test.go
2 changes: 2 additions & 0 deletions pkg/migration/migrationmanager/manager_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,8 @@ SELECT id

// Test that the precondition prevents migrations from being run.
func TestPrecondition(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// Start by running v0. We want the precondition of v1 to prevent
// us from reaching v1 (or v2). We want the precondition to not be
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func (s *Server) drainClients(ctx context.Context, reporter func(int, redact.Saf

// Drain the SQL leases. This must be done after the pgServer has
// given sessions a chance to finish ongoing work.
s.sqlServer.leaseMgr.SetDraining(true /* drain */, reporter)
s.sqlServer.leaseMgr.SetDraining(ctx, true /* drain */, reporter)

// Done. This executes the defers set above to drain SQL leases.
return nil
Expand Down
22 changes: 12 additions & 10 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,15 +450,17 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
}
tcsFactory := kvcoord.NewTxnCoordSenderFactory(txnCoordSenderFactoryCfg, distSender)

gcoords, metrics := admission.NewGrantCoordinators(admission.Options{
MinCPUSlots: 1,
MaxCPUSlots: 100000, /* TODO(sumeer): add cluster setting */
SQLKVResponseBurstTokens: 100000, /* TODO(sumeer): add cluster setting */
SQLSQLResponseBurstTokens: 100000, /* arbitrary, and unused */
SQLStatementLeafStartWorkSlots: 100, /* arbitrary, and unused */
SQLStatementRootStartWorkSlots: 100, /* arbitrary, and unused */
Settings: st,
})
gcoords, metrics := admission.NewGrantCoordinators(
cfg.AmbientCtx,
admission.Options{
MinCPUSlots: 1,
MaxCPUSlots: 100000, /* TODO(sumeer): add cluster setting */
SQLKVResponseBurstTokens: 100000, /* TODO(sumeer): add cluster setting */
SQLSQLResponseBurstTokens: 100000, /* arbitrary, and unused */
SQLStatementLeafStartWorkSlots: 100, /* arbitrary, and unused */
SQLStatementRootStartWorkSlots: 100, /* arbitrary, and unused */
Settings: st,
})
for i := range metrics {
registry.AddMetricStruct(metrics[i])
}
Expand Down Expand Up @@ -1635,7 +1637,7 @@ func (s *Server) PreStart(ctx context.Context) error {
return err
}
// Stores have been initialized, so Node can now provide Pebble metrics.
s.storeGrantCoords.SetPebbleMetricsProvider(s.node)
s.storeGrantCoords.SetPebbleMetricsProvider(ctx, s.node)

log.Event(ctx, "started node")
if err := s.startPersistingHLCUpperBound(
Expand Down
9 changes: 8 additions & 1 deletion pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ import (
// standalone SQLServer instances per tenant (the KV layer is shared across all
// tenants).
type SQLServer struct {
ambientCtx log.AmbientContext
stopper *stop.Stopper
sqlIDContainer *base.SQLIDContainer
pgServer *pgwire.Server
Expand Down Expand Up @@ -906,6 +907,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
}

return &SQLServer{
ambientCtx: cfg.BaseConfig.AmbientCtx,
stopper: cfg.stopper,
sqlIDContainer: cfg.nodeIDContainer,
pgServer: pgServer,
Expand Down Expand Up @@ -1120,7 +1122,7 @@ func (s *SQLServer) preStart(

// Delete all orphaned table leases created by a prior instance of this
// node. This also uses SQL.
s.leaseMgr.DeleteOrphanedLeases(orphanedLeasesTimeThresholdNanos)
s.leaseMgr.DeleteOrphanedLeases(ctx, orphanedLeasesTimeThresholdNanos)

// Start scheduled jobs daemon.
jobs.StartJobSchedulerDaemon(
Expand Down Expand Up @@ -1164,3 +1166,8 @@ func (s *SQLServer) SQLInstanceID() base.SQLInstanceID {
func (s *SQLServer) StartDiagnostics(ctx context.Context) {
s.diagnosticsReporter.PeriodicallyReportDiagnostics(ctx, s.stopper)
}

// AnnotateCtx annotates the given context with the server tracer and tags.
func (s *SQLServer) AnnotateCtx(ctx context.Context) context.Context {
return s.ambientCtx.AnnotateCtx(ctx)
}
34 changes: 30 additions & 4 deletions pkg/server/tenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func StartTenant(
return nil, "", "", err
}

args, err := makeTenantSQLServerArgs(stopper, kvClusterName, baseCfg, sqlCfg)
args, err := makeTenantSQLServerArgs(ctx, stopper, kvClusterName, baseCfg, sqlCfg)
if err != nil {
return nil, "", "", err
}
Expand Down Expand Up @@ -96,7 +96,18 @@ func StartTenant(
// TODO(davidh): Do we need to force this to be false?
baseCfg.SplitListenSQL = false

background := baseCfg.AmbientCtx.AnnotateCtx(context.Background())
// Add the server tags to the startup context.
//
// We use args.BaseConfig here instead of baseCfg directly because
// makeTenantSQLArgs defines its own AmbientCtx instance and it's
// defined by-value.
ctx = args.BaseConfig.AmbientCtx.AnnotateCtx(ctx)

// Add the server tags to a generic background context for use
// by async goroutines.
// We can only annotate the context after makeTenantSQLServerArgs
// has defined the instance ID container in the AmbientCtx.
background := args.BaseConfig.AmbientCtx.AnnotateCtx(context.Background())

// StartListenRPCAndSQL will replace the SQLAddr fields if we choose
// to share the SQL and gRPC port so here, since the tenant config
Expand Down Expand Up @@ -164,6 +175,16 @@ func StartTenant(
args.sqlStatusServer = tenantStatusServer
s, err := newSQLServer(ctx, args)
tenantStatusServer.sqlServer = s
// Also add the SQL instance tag to the tenant status server's
// ambient context.
//
// We use the tag "sqli" instead of just "sql" because the latter is
// too generic and would be hard to search if someone was looking at
// a log message and wondering what it stands for.
//
// TODO(knz): find a way to share common logging tags between
// multiple AmbientContext instances.
tenantStatusServer.AmbientContext.AddLogTag("sqli", s.sqlIDContainer)

if err != nil {
return nil, "", "", err
Expand Down Expand Up @@ -355,7 +376,11 @@ func loadVarsHandler(
}

func makeTenantSQLServerArgs(
stopper *stop.Stopper, kvClusterName string, baseCfg BaseConfig, sqlCfg SQLConfig,
startupCtx context.Context,
stopper *stop.Stopper,
kvClusterName string,
baseCfg BaseConfig,
sqlCfg SQLConfig,
) (sqlServerArgs, error) {
st := baseCfg.Settings

Expand All @@ -366,6 +391,7 @@ func makeTenantSQLServerArgs(
// too generic and would be hard to search if someone was looking at
// a log message and wondering what it stands for.
baseCfg.AmbientCtx.AddLogTag("sqli", instanceIDContainer)
startupCtx = baseCfg.AmbientCtx.AnnotateCtx(startupCtx)

// TODO(tbg): this is needed so that the RPC heartbeats between the testcluster
// and this tenant work.
Expand Down Expand Up @@ -480,7 +506,7 @@ func makeTenantSQLServerArgs(

recorder := status.NewMetricsRecorder(clock, nil, rpcContext, nil, st)

runtime := status.NewRuntimeStatSampler(context.Background(), clock)
runtime := status.NewRuntimeStatSampler(startupCtx, clock)
registry.AddMetricStruct(runtime)

esb := &externalStorageBuilder{}
Expand Down
7 changes: 5 additions & 2 deletions pkg/sql/catalog/lease/descriptor_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
"github.com/cockroachdb/redact"
)

Expand Down Expand Up @@ -259,7 +260,7 @@ func (t *descriptorState) release(ctx context.Context, s *descriptorVersionState
return nil
}
if l := maybeRemoveLease(); l != nil {
releaseLease(l, t.m)
releaseLease(ctx, l, t.m)
}
}

Expand All @@ -273,7 +274,9 @@ func (t *descriptorState) maybeQueueLeaseRenewal(
}

// Start the renewal. When it finishes, it will reset t.renewalInProgress.
return t.stopper.RunAsyncTask(context.Background(),
newCtx := m.ambientCtx.AnnotateCtx(context.Background())
newCtx = logtags.WithTags(newCtx, logtags.FromContext(ctx))
return t.stopper.RunAsyncTask(newCtx,
"lease renewal", func(ctx context.Context) {
t.startLeaseRenewal(ctx, m, id, name)
})
Expand Down
Loading

0 comments on commit 4300949

Please sign in to comment.