Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

importccl: Correctly handle errors and cancellations during import. #49979

Merged
merged 1 commit into from
Jun 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 13 additions & 10 deletions pkg/ccl/importccl/read_import_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ type importFileContext struct {
// handleCorruptRow reports an error encountered while processing a row
// in an input file.
func handleCorruptRow(ctx context.Context, fileCtx *importFileContext, err error) error {
log.Errorf(ctx, "%v", err)
log.Errorf(ctx, "%+v", err)

if rowErr := (*importRowError)(nil); errors.As(err, &rowErr) && fileCtx.rejected != nil {
fileCtx.rejected <- rowErr.row + "\n"
Expand Down Expand Up @@ -545,7 +545,7 @@ func runParallelImport(
}

if producer.Err() == nil {
return importer.flush(ctx)
return importer.close(ctx)
}
return producer.Err()
})
Expand All @@ -569,22 +569,25 @@ func (p *parallelImporter) add(
return nil
}

// Flush flushes currently accumulated data.
// close closes this importer, flushing remaining accumulated data if needed.
func (p *parallelImporter) close(ctx context.Context) error {
if len(p.b.data) > 0 {
return p.flush(ctx)
}
return nil
}

// flush flushes currently accumulated data.
func (p *parallelImporter) flush(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}

// if the batch isn't empty, we need to flush it.
if len(p.b.data) > 0 {
p.recordCh <- p.b
case p.recordCh <- p.b:
p.b = batch{
data: make([]interface{}, 0, cap(p.b.data)),
}
return nil
}
return nil
}

func (p *parallelImporter) importWorker(
Expand Down
145 changes: 145 additions & 0 deletions pkg/ccl/importccl/read_import_base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,19 @@
package importccl

import (
"context"
"math/rand"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/sql/row"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

func TestRejectedFilename(t *testing.T) {
Expand Down Expand Up @@ -42,3 +52,138 @@ func TestRejectedFilename(t *testing.T) {
}
}
}

// nilDataProducer produces infinite stream of nulls.
// It implements importRowProducer.
type nilDataProducer struct{}

func (p *nilDataProducer) Scan() bool {
return true
}

func (p *nilDataProducer) Err() error {
return nil
}

func (p *nilDataProducer) Skip() error {
return nil
}

func (p *nilDataProducer) Row() (interface{}, error) {
return nil, nil
}

func (p *nilDataProducer) Progress() float32 {
return 0.0
}

var _ importRowProducer = &nilDataProducer{}

// errorReturningConsumer always returns an error.
// It implements importRowConsumer.
type errorReturningConsumer struct {
err error
}

func (d *errorReturningConsumer) FillDatums(
_ interface{}, _ int64, c *row.DatumRowConverter,
) error {
return d.err
}

var _ importRowConsumer = &errorReturningConsumer{}

// nilDataConsumer consumes and emits infinite stream of null.
// it implements importRowConsumer.
type nilDataConsumer struct{}

func (n *nilDataConsumer) FillDatums(_ interface{}, _ int64, c *row.DatumRowConverter) error {
c.Datums[0] = tree.DNull
return nil
}

var _ importRowConsumer = &nilDataConsumer{}

func TestParallelImportProducerHandlesConsumerErrors(t *testing.T) {
defer leaktest.AfterTest(t)()

// Dummy descriptor for import
descr := sqlbase.TableDescriptor{
Name: "test",
Columns: []sqlbase.ColumnDescriptor{
{Name: "column", ID: 1, Type: types.Int, Nullable: true},
},
}

// Flush datum converter frequently
defer row.TestingSetDatumRowConverterBatchSize(1)()

// Create KV channel and arrange for it to be drained
kvCh := make(chan row.KVBatch)
defer close(kvCh)
go func() {
for range kvCh {
}
}()

// Prepare import context, which flushes to kvCh frequently.
importCtx := &parallelImportContext{
numWorkers: 1,
batchSize: 2,
evalCtx: testEvalCtx,
tableDesc: &descr,
kvCh: kvCh,
}

consumer := &errorReturningConsumer{errors.New("consumer aborted")}

require.Equal(t, consumer.err,
runParallelImport(context.Background(), importCtx,
&importFileContext{}, &nilDataProducer{}, consumer))
}

func TestParallelImportProducerHandlesCancellation(t *testing.T) {
defer leaktest.AfterTest(t)()

// Dummy descriptor for import
descr := sqlbase.TableDescriptor{
Name: "test",
Columns: []sqlbase.ColumnDescriptor{
{Name: "column", ID: 1, Type: types.Int, Nullable: true},
},
}

// Flush datum converter frequently
defer row.TestingSetDatumRowConverterBatchSize(1)()

// Create KV channel and arrange for it to be drained
kvCh := make(chan row.KVBatch)
defer close(kvCh)
go func() {
for range kvCh {
}
}()

// Prepare import context, which flushes to kvCh frequently.
importCtx := &parallelImportContext{
numWorkers: 1,
batchSize: 2,
evalCtx: testEvalCtx,
tableDesc: &descr,
kvCh: kvCh,
}

// Run a hundred imports, which will timeout shortly after they start.
require.NoError(t, ctxgroup.GroupWorkers(context.Background(), 100,
func(_ context.Context, _ int) error {
timeout := time.Millisecond * time.Duration(250+rand.Intn(250))
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer func(f func()) {
f()
}(cancel)
require.Equal(t, context.DeadlineExceeded,
runParallelImport(ctx, importCtx,
&importFileContext{}, &nilDataProducer{}, &nilDataConsumer{}))
return nil
}))
}