Skip to content

Commit

Permalink
importccl: Correctly handle errors and cancellations during import.
Browse files Browse the repository at this point in the history
Fixes cockroachdb#49977

Parallel importer could get stuck  due to a race between emitted
import batches and checking for context cancellation (either due to an
unforeseen error, or due to explicit context cancallation).

Fix the race condition, and add tests verifying correct behavior.

Release notes (bug fix): correctly handle import cancellation and errors.
  • Loading branch information
Yevgeniy Miretskiy committed Jun 11, 2020
1 parent 92670dd commit 2982464
Show file tree
Hide file tree
Showing 2 changed files with 158 additions and 10 deletions.
23 changes: 13 additions & 10 deletions pkg/ccl/importccl/read_import_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ type importFileContext struct {
// handleCorruptRow reports an error encountered while processing a row
// in an input file.
func handleCorruptRow(ctx context.Context, fileCtx *importFileContext, err error) error {
log.Error(ctx, err)
log.Errorf(ctx, "%+v", err)

if rowErr, isRowErr := err.(*importRowError); isRowErr && fileCtx.rejected != nil {
fileCtx.rejected <- rowErr.row + "\n"
Expand Down Expand Up @@ -544,7 +544,7 @@ func runParallelImport(
}

if producer.Err() == nil {
return importer.flush(ctx)
return importer.close(ctx)
}
return producer.Err()
})
Expand All @@ -568,22 +568,25 @@ func (p *parallelImporter) add(
return nil
}

// Flush flushes currently accumulated data.
// close closes this importer, flushing remaining accumulated data if needed.
func (p *parallelImporter) close(ctx context.Context) error {
if len(p.b.data) > 0 {
return p.flush(ctx)
}
return nil
}

// flush flushes currently accumulated data.
func (p *parallelImporter) flush(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}

// if the batch isn't empty, we need to flush it.
if len(p.b.data) > 0 {
p.recordCh <- p.b
case p.recordCh <- p.b:
p.b = batch{
data: make([]interface{}, 0, cap(p.b.data)),
}
return nil
}
return nil
}

func (p *parallelImporter) importWorker(
Expand Down
145 changes: 145 additions & 0 deletions pkg/ccl/importccl/read_import_base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,19 @@
package importccl

import (
"context"
"math/rand"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/sql/row"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

func TestRejectedFilename(t *testing.T) {
Expand Down Expand Up @@ -42,3 +52,138 @@ func TestRejectedFilename(t *testing.T) {
}
}
}

// nilDataProducer produces infinite stream of nulls.
// It implements importRowProducer.
type nilDataProducer struct{}

func (p *nilDataProducer) Scan() bool {
return true
}

func (p *nilDataProducer) Err() error {
return nil
}

func (p *nilDataProducer) Skip() error {
return nil
}

func (p *nilDataProducer) Row() (interface{}, error) {
return nil, nil
}

func (p *nilDataProducer) Progress() float32 {
return 0.0
}

var _ importRowProducer = &nilDataProducer{}

// errorReturningConsumer always returns an error.
// It implements importRowConsumer.
type errorReturningConsumer struct {
err error
}

func (d *errorReturningConsumer) FillDatums(
_ interface{}, _ int64, c *row.DatumRowConverter,
) error {
return d.err
}

var _ importRowConsumer = &errorReturningConsumer{}

// nilDataConsumer consumes and emits infinite stream of null.
// it implements importRowConsumer.
type nilDataConsumer struct{}

func (n *nilDataConsumer) FillDatums(_ interface{}, _ int64, c *row.DatumRowConverter) error {
c.Datums[0] = tree.DNull
return nil
}

var _ importRowConsumer = &nilDataConsumer{}

func TestParallelImportProducerHandlesConsumerErrors(t *testing.T) {
defer leaktest.AfterTest(t)()

// Dummy descriptor for import
descr := sqlbase.TableDescriptor{
Name: "test",
Columns: []sqlbase.ColumnDescriptor{
{Name: "column", ID: 1, Type: *types.Int, Nullable: true},
},
}

// Flush datum converter frequently
defer row.TestingSetDatumRowConverterBatchSize(1)()

// Create KV channel and arrange for it to be drained
kvCh := make(chan row.KVBatch)
defer close(kvCh)
go func() {
for range kvCh {
}
}()

// Prepare import context, which flushes to kvCh frequently.
importCtx := &parallelImportContext{
numWorkers: 1,
batchSize: 2,
evalCtx: testEvalCtx,
tableDesc: &descr,
kvCh: kvCh,
}

consumer := &errorReturningConsumer{errors.New("consumer aborted")}

require.Equal(t, consumer.err,
runParallelImport(context.Background(), importCtx,
&importFileContext{}, &nilDataProducer{}, consumer))
}

func TestParallelImportProducerHandlesCancellation(t *testing.T) {
defer leaktest.AfterTest(t)()

// Dummy descriptor for import
descr := sqlbase.TableDescriptor{
Name: "test",
Columns: []sqlbase.ColumnDescriptor{
{Name: "column", ID: 1, Type: *types.Int, Nullable: true},
},
}

// Flush datum converter frequently
defer row.TestingSetDatumRowConverterBatchSize(1)()

// Create KV channel and arrange for it to be drained
kvCh := make(chan row.KVBatch)
defer close(kvCh)
go func() {
for range kvCh {
}
}()

// Prepare import context, which flushes to kvCh frequently.
importCtx := &parallelImportContext{
numWorkers: 1,
batchSize: 2,
evalCtx: testEvalCtx,
tableDesc: &descr,
kvCh: kvCh,
}

// Run a hundred imports, which will timeout shortly after they start.
require.NoError(t, ctxgroup.GroupWorkers(context.Background(), 100,
func(_ context.Context, _ int) error {
timeout := time.Millisecond * time.Duration(250+rand.Intn(250))
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer func(f func()) {
f()
}(cancel)
require.Equal(t, context.DeadlineExceeded,
runParallelImport(ctx, importCtx,
&importFileContext{}, &nilDataProducer{}, &nilDataConsumer{}))
return nil
}))
}

0 comments on commit 2982464

Please sign in to comment.