Skip to content

Commit

Permalink
Merge #40676 #40838
Browse files Browse the repository at this point in the history
40676: sql: stop presplitting spans on every import proc r=pbardea a=pbardea

We are currently presplitting spans (on every table and index boundary)
for every processor. We should only be doing this once so this PR moves
it to the planning stage.

Addresses #39072.

Release note: None

40838: makefile: increase test timeout r=andreimatei a=andreimatei

This patch increases the test timeout for every package from 12m to 20m
(and under stress from 25m to 30m).
This is motivated by the sql TestLogic which has been inching towards
the 12m recently (according to TeamCity history) and is sometimes timing
out. Hopefully it's because we've been adding more tests...

Fixes #40572

Release note: None

Release justification: fix timeout flakes

Co-authored-by: Paul Bardea <[email protected]>
Co-authored-by: Andrei Matei <[email protected]>
  • Loading branch information
3 people committed Sep 17, 2019
3 parents 19086ba + 89f4cdf + 45f5cd8 commit faaa699
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 38 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,10 @@ SUBTESTS :=
LINTTIMEOUT := 20m

## Test timeout to use for regular tests.
TESTTIMEOUT := 12m
TESTTIMEOUT := 20m

## Test timeout to use for race tests.
RACETIMEOUT := 25m
RACETIMEOUT := 30m

## Test timeout to use for acceptance tests.
ACCEPTANCETIMEOUT := 30m
Expand Down
36 changes: 0 additions & 36 deletions pkg/ccl/importccl/import_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,9 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/storageccl"
"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/row"
Expand All @@ -31,7 +29,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -255,45 +252,12 @@ func (cp *readImportDataProcessor) emitKvs(ctx context.Context, kvCh <-chan row.
return nil
}

func (cp *readImportDataProcessor) presplitTableBoundaries(ctx context.Context) error {
// TODO(jeffreyxiao): Remove this check in 20.1.
// If the cluster supports sticky bits, then we should use the sticky bit to
// ensure that the splits are not automatically split by the merge queue. If
// the cluster does not support sticky bits, we disable the merge queue via
// gossip, so we can just set the split to expire immediately.
stickyBitEnabled := cp.flowCtx.Cfg.Settings.Version.IsActive(cluster.VersionStickyBit)
expirationTime := hlc.Timestamp{}
if stickyBitEnabled {
expirationTime = cp.flowCtx.Cfg.DB.Clock().Now().Add(time.Hour.Nanoseconds(), 0)
}
for _, tbl := range cp.spec.Tables {
for _, span := range tbl.Desc.AllIndexSpans() {
if err := cp.flowCtx.Cfg.DB.AdminSplit(ctx, span.Key, span.Key, expirationTime); err != nil {
return err
}

log.VEventf(ctx, 1, "scattering index range %s", span.Key)
scatterReq := &roachpb.AdminScatterRequest{
RequestHeader: roachpb.RequestHeaderFromSpan(span),
}
if _, pErr := client.SendWrapped(ctx, cp.flowCtx.Cfg.DB.NonTransactionalSender(), scatterReq); pErr != nil {
log.Errorf(ctx, "failed to scatter span %s: %s", span.Key, pErr)
}
}
}
return nil
}

// ingestKvs drains kvs from the channel until it closes, ingesting them using
// the BulkAdder. It handles the required buffering/sorting/etc.
func (cp *readImportDataProcessor) ingestKvs(ctx context.Context, kvCh <-chan row.KVBatch) error {
ctx, span := tracing.ChildSpan(ctx, "ingestKVs")
defer tracing.FinishSpan(span)

if err := cp.presplitTableBoundaries(ctx); err != nil {
return err
}

writeTS := hlc.Timestamp{WallTime: cp.spec.WalltimeNanos}

flushSize := storageccl.MaxImportBatchSize(cp.flowCtx.Cfg.Settings)
Expand Down
38 changes: 38 additions & 0 deletions pkg/sql/distsql_plan_csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/physicalplan"
"github.com/cockroachdb/cockroach/pkg/sql/rowcontainer"
Expand Down Expand Up @@ -601,6 +602,39 @@ func (dsp *DistSQLPlanner) loadCSVSamplingPlan(
return samples, nil
}

func presplitTableBoundaries(
ctx context.Context,
cfg *ExecutorConfig,
tables map[string]*execinfrapb.ReadImportDataSpec_ImportTable,
) error {
// TODO(jeffreyxiao): Remove this check in 20.1.
// If the cluster supports sticky bits, then we should use the sticky bit to
// ensure that the splits are not automatically split by the merge queue. If
// the cluster does not support sticky bits, we disable the merge queue via
// gossip, so we can just set the split to expire immediately.
stickyBitEnabled := cfg.Settings.Version.IsActive(cluster.VersionStickyBit)
expirationTime := hlc.Timestamp{}
if stickyBitEnabled {
expirationTime = cfg.DB.Clock().Now().Add(time.Hour.Nanoseconds(), 0)
}
for _, tbl := range tables {
for _, span := range tbl.Desc.AllIndexSpans() {
if err := cfg.DB.AdminSplit(ctx, span.Key, span.Key, expirationTime); err != nil {
return err
}

log.VEventf(ctx, 1, "scattering index range %s", span.Key)
scatterReq := &roachpb.AdminScatterRequest{
RequestHeader: roachpb.RequestHeaderFromSpan(span),
}
if _, pErr := client.SendWrapped(ctx, cfg.DB.NonTransactionalSender(), scatterReq); pErr != nil {
log.Errorf(ctx, "failed to scatter span %s: %s", span.Key, pErr)
}
}
}
return nil
}

// DistIngest is used by IMPORT to run a DistSQL flow to ingest data by starting
// reader processes on many nodes that each read and ingest their assigned files
// and then send back a summary of what they ingested. The combined summary is
Expand Down Expand Up @@ -688,6 +722,10 @@ func DistIngest(
return nil
})

if err := presplitTableBoundaries(ctx, phs.ExecCfg(), tables); err != nil {
return roachpb.BulkOpSummary{}, err
}

recv := MakeDistSQLReceiver(
ctx,
&metadataCallbackWriter{rowResultWriter: rowResultWriter, fn: metaFn},
Expand Down

0 comments on commit faaa699

Please sign in to comment.