Skip to content

Commit

Permalink
Merge #50089
Browse files Browse the repository at this point in the history
50089: release-20.1: bulkio:  import no longer gets stuck due to errors encountered during import r=miretskiy a=miretskiy

Backport:
  * 1/1 commits from "importccl: Correctly handle errors and cancellations during import." (#49979)
  * 1/1 commits from "bulkio: Correctly group producer/consumers when importing data" (#49995)

Please see individual PRs for details.

/cc @cockroachdb/release


Co-authored-by: Yevgeniy Miretskiy <[email protected]>
  • Loading branch information
craig[bot] and Yevgeniy Miretskiy committed Jun 12, 2020
2 parents dd71687 + 92f3815 commit c9db57a
Show file tree
Hide file tree
Showing 3 changed files with 232 additions and 25 deletions.
62 changes: 61 additions & 1 deletion pkg/ccl/importccl/import_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ package importccl

import (
"context"
"errors"
"fmt"
"io/ioutil"
"math"
Expand Down Expand Up @@ -46,6 +45,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -429,6 +429,66 @@ func TestImportHonorsResumePosition(t *testing.T) {
}
}

type duplicateKeyErrorAdder struct {
doNothingKeyAdder
}

var _ storagebase.BulkAdder = &duplicateKeyErrorAdder{}

func (a *duplicateKeyErrorAdder) Add(_ context.Context, k roachpb.Key, v []byte) error {
return &storagebase.DuplicateKeyError{Key: k, Value: v}
}

func TestImportHandlesDuplicateKVs(t *testing.T) {
defer leaktest.AfterTest(t)()

batchSize := 13
defer row.TestingSetDatumRowConverterBatchSize(batchSize)()
evalCtx := tree.MakeTestingEvalContext(nil)
flowCtx := &execinfra.FlowCtx{
EvalCtx: &evalCtx,
Cfg: &execinfra.ServerConfig{
Settings: &cluster.Settings{},
ExternalStorage: externalStorageFactory,
BulkAdder: func(
_ context.Context, _ *kv.DB, _ hlc.Timestamp,
opts storagebase.BulkAdderOptions) (storagebase.BulkAdder, error) {
return &duplicateKeyErrorAdder{}, nil
},
TestingKnobs: execinfra.TestingKnobs{
BulkAdderFlushesEveryBatch: true,
},
},
}

// In this test, we'll attempt to import different input formats.
// All imports produce a DuplicateKeyError, which we expect to be propagated.
testSpecs := []testSpec{
newTestSpec(t, csvFormat(), "testdata/csv/data-0"),
newTestSpec(t, mysqlDumpFormat(), "testdata/mysqldump/simple.sql"),
newTestSpec(t, mysqlOutFormat(), "testdata/mysqlout/csv-ish/simple.txt"),
newTestSpec(t, pgCopyFormat(), "testdata/pgcopy/default/test.txt"),
newTestSpec(t, pgDumpFormat(), "testdata/pgdump/simple.sql"),
newTestSpec(t, avroFormat(t, roachpb.AvroOptions_JSON_RECORDS), "testdata/avro/simple-sorted.json"),
}

for _, testCase := range testSpecs {
spec := testCase.getConverterSpec()

t.Run(fmt.Sprintf("duplicate-key-%v", spec.Format.Format), func(t *testing.T) {
progCh := make(chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress)
defer close(progCh)
go func() {
for range progCh {
}
}()

_, err := runImport(context.Background(), flowCtx, spec, progCh)
require.True(t, errors.HasType(err, &storagebase.DuplicateKeyError{}))
})
}
}

// syncBarrier allows 2 threads (a controller and a worker) to
// synchronize between themselves. A controller portion of the
// barrier waits until worker starts running, and then notifies
Expand Down
50 changes: 26 additions & 24 deletions pkg/ccl/importccl/read_import_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,15 @@ func runImport(
}

// This group holds the go routines that are responsible for producing KV batches.
// After this group is done, we need to close kvCh.
// and ingesting produced KVs.
// Depending on the import implementation both conv.start and conv.readFiles can
// produce KVs so we should close the channel only after *both* are finished.
producerGroup := ctxgroup.WithContext(ctx)
conv.start(producerGroup)
group := ctxgroup.WithContext(ctx)
conv.start(group)

// Read input files into kvs
producerGroup.GoCtx(func(ctx context.Context) error {
group.GoCtx(func(ctx context.Context) error {
defer close(kvCh)
ctx, span := tracing.ChildSpan(ctx, "readImportFiles")
defer tracing.FinishSpan(span)
var inputs map[int32]string
Expand All @@ -77,13 +79,6 @@ func runImport(
return conv.readFiles(ctx, inputs, spec.ResumePos, spec.Format, flowCtx.Cfg.ExternalStorage)
})

// This group links together the producers (via producerGroup) and the KV ingester.
group := ctxgroup.WithContext(ctx)
group.Go(func() error {
defer close(kvCh)
return producerGroup.Wait()
})

// Ingest the KVs that the producer group emitted to the chan and the row result
// at the end is one row containing an encoded BulkOpSummary.
var summary *roachpb.BulkOpSummary
Expand All @@ -99,11 +94,15 @@ func runImport(
prog.CompletedFraction[i] = 1.0
prog.ResumePos[i] = math.MaxInt64
}
progCh <- prog
return nil
select {
case <-ctx.Done():
return ctx.Err()
case progCh <- prog:
return nil
}
})

if err := group.Wait(); err != nil {
if err = group.Wait(); err != nil {
return nil, err
}

Expand Down Expand Up @@ -394,7 +393,7 @@ type importFileContext struct {
// handleCorruptRow reports an error encountered while processing a row
// in an input file.
func handleCorruptRow(ctx context.Context, fileCtx *importFileContext, err error) error {
log.Error(ctx, err)
log.Errorf(ctx, "%+v", err)

if rowErr, isRowErr := err.(*importRowError); isRowErr && fileCtx.rejected != nil {
fileCtx.rejected <- rowErr.row + "\n"
Expand Down Expand Up @@ -544,7 +543,7 @@ func runParallelImport(
}

if producer.Err() == nil {
return importer.flush(ctx)
return importer.close(ctx)
}
return producer.Err()
})
Expand All @@ -568,22 +567,25 @@ func (p *parallelImporter) add(
return nil
}

// Flush flushes currently accumulated data.
// close closes this importer, flushing remaining accumulated data if needed.
func (p *parallelImporter) close(ctx context.Context) error {
if len(p.b.data) > 0 {
return p.flush(ctx)
}
return nil
}

// flush flushes currently accumulated data.
func (p *parallelImporter) flush(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}

// if the batch isn't empty, we need to flush it.
if len(p.b.data) > 0 {
p.recordCh <- p.b
case p.recordCh <- p.b:
p.b = batch{
data: make([]interface{}, 0, cap(p.b.data)),
}
return nil
}
return nil
}

func (p *parallelImporter) importWorker(
Expand Down
145 changes: 145 additions & 0 deletions pkg/ccl/importccl/read_import_base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,19 @@
package importccl

import (
"context"
"math/rand"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/sql/row"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

func TestRejectedFilename(t *testing.T) {
Expand Down Expand Up @@ -42,3 +52,138 @@ func TestRejectedFilename(t *testing.T) {
}
}
}

// nilDataProducer produces infinite stream of nulls.
// It implements importRowProducer.
type nilDataProducer struct{}

func (p *nilDataProducer) Scan() bool {
return true
}

func (p *nilDataProducer) Err() error {
return nil
}

func (p *nilDataProducer) Skip() error {
return nil
}

func (p *nilDataProducer) Row() (interface{}, error) {
return nil, nil
}

func (p *nilDataProducer) Progress() float32 {
return 0.0
}

var _ importRowProducer = &nilDataProducer{}

// errorReturningConsumer always returns an error.
// It implements importRowConsumer.
type errorReturningConsumer struct {
err error
}

func (d *errorReturningConsumer) FillDatums(
_ interface{}, _ int64, c *row.DatumRowConverter,
) error {
return d.err
}

var _ importRowConsumer = &errorReturningConsumer{}

// nilDataConsumer consumes and emits infinite stream of null.
// it implements importRowConsumer.
type nilDataConsumer struct{}

func (n *nilDataConsumer) FillDatums(_ interface{}, _ int64, c *row.DatumRowConverter) error {
c.Datums[0] = tree.DNull
return nil
}

var _ importRowConsumer = &nilDataConsumer{}

func TestParallelImportProducerHandlesConsumerErrors(t *testing.T) {
defer leaktest.AfterTest(t)()

// Dummy descriptor for import
descr := sqlbase.TableDescriptor{
Name: "test",
Columns: []sqlbase.ColumnDescriptor{
{Name: "column", ID: 1, Type: *types.Int, Nullable: true},
},
}

// Flush datum converter frequently
defer row.TestingSetDatumRowConverterBatchSize(1)()

// Create KV channel and arrange for it to be drained
kvCh := make(chan row.KVBatch)
defer close(kvCh)
go func() {
for range kvCh {
}
}()

// Prepare import context, which flushes to kvCh frequently.
importCtx := &parallelImportContext{
numWorkers: 1,
batchSize: 2,
evalCtx: testEvalCtx,
tableDesc: &descr,
kvCh: kvCh,
}

consumer := &errorReturningConsumer{errors.New("consumer aborted")}

require.Equal(t, consumer.err,
runParallelImport(context.Background(), importCtx,
&importFileContext{}, &nilDataProducer{}, consumer))
}

func TestParallelImportProducerHandlesCancellation(t *testing.T) {
defer leaktest.AfterTest(t)()

// Dummy descriptor for import
descr := sqlbase.TableDescriptor{
Name: "test",
Columns: []sqlbase.ColumnDescriptor{
{Name: "column", ID: 1, Type: *types.Int, Nullable: true},
},
}

// Flush datum converter frequently
defer row.TestingSetDatumRowConverterBatchSize(1)()

// Create KV channel and arrange for it to be drained
kvCh := make(chan row.KVBatch)
defer close(kvCh)
go func() {
for range kvCh {
}
}()

// Prepare import context, which flushes to kvCh frequently.
importCtx := &parallelImportContext{
numWorkers: 1,
batchSize: 2,
evalCtx: testEvalCtx,
tableDesc: &descr,
kvCh: kvCh,
}

// Run a hundred imports, which will timeout shortly after they start.
require.NoError(t, ctxgroup.GroupWorkers(context.Background(), 100,
func(_ context.Context, _ int) error {
timeout := time.Millisecond * time.Duration(250+rand.Intn(250))
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer func(f func()) {
f()
}(cancel)
require.Equal(t, context.DeadlineExceeded,
runParallelImport(ctx, importCtx,
&importFileContext{}, &nilDataProducer{}, &nilDataConsumer{}))
return nil
}))
}

0 comments on commit c9db57a

Please sign in to comment.