Skip to content

Commit

Permalink
Merge #101792
Browse files Browse the repository at this point in the history
101792: copy: fix extra flush loop causing test to flake r=cucaroach a=cucaroach

Release note: None
Epic: None
Fixes: #101610


Co-authored-by: Tommy Reilly <[email protected]>
  • Loading branch information
craig[bot] and cucaroach committed May 3, 2023
2 parents 9e0b06b + 1ccc551 commit e3efa88
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 2 deletions.
4 changes: 4 additions & 0 deletions pkg/sql/copy/copy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,10 @@ func TestLargeDynamicRows(t *testing.T) {
batchNumber++
return nil
},
CopyFromInsertRetry: func() error {
batchNumber--
return nil
},
}
s, _, _ := serverutils.StartServer(t, params)
defer s.Stopper().Stop(ctx)
Expand Down
9 changes: 7 additions & 2 deletions pkg/sql/copy_from.go
Original file line number Diff line number Diff line change
Expand Up @@ -642,9 +642,9 @@ func (c *copyMachine) processCopyData(ctx context.Context, data string, final bo
// If we have a full batch of rows or we have exceeded maxRowMem process
// them. Only set finalBatch to true if this is the last
// CopyData segment AND we have no more data in the buffer.
if len := c.currentBatchSize(); c.rowsMemAcc.Used() > c.maxRowMem || len == c.copyBatchRowSize || batchDone {
if len := c.currentBatchSize(); len > 0 && (c.rowsMemAcc.Used() > c.maxRowMem || len >= c.copyBatchRowSize || batchDone) {
if len != c.copyBatchRowSize {
log.VEventf(ctx, 2, "copy batch of %d rows flushing due to memory usage %d > %d", c.batch.Length(), c.rowsMemAcc.Used(), c.maxRowMem)
log.VEventf(ctx, 2, "copy batch of %d rows flushing due to memory usage %d > %d", len, c.rowsMemAcc.Used(), c.maxRowMem)
}
if err := c.processRows(ctx, final && c.buf.Len() == 0); err != nil {
return err
Expand Down Expand Up @@ -1026,6 +1026,11 @@ func (c *copyMachine) insertRows(ctx context.Context, finalBatch bool) error {
// NOTE: in theory we can also retry if c.insertRows == 0.
if c.implicitTxn && !c.p.SessionData().CopyFromAtomicEnabled && c.p.SessionData().CopyFromRetriesEnabled && errIsRetriable(err) {
log.SqlExec.Infof(ctx, "%s failed on attempt %d and is retrying, error %+v", c.copyFromAST.String(), r.CurrentAttempt(), err)
if c.p.ExecCfg().TestingKnobs.CopyFromInsertRetry != nil {
if err := c.p.ExecCfg().TestingKnobs.CopyFromInsertRetry(); err != nil {
return err
}
}
continue
}
return err
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -1613,6 +1613,9 @@ type ExecutorTestingKnobs struct {
// BeforeCopyFromInsert, if set, will be called during a COPY FROM insert statement.
BeforeCopyFromInsert func() error

// CopyFromInsertRetry, if set, will be called when a COPY FROM insert statement is retried.
CopyFromInsertRetry func() error

// ForceSQLLivenessSession will force the use of a sqlliveness session for
// transaction deadlines even in the system tenant.
ForceSQLLivenessSession bool
Expand Down

0 comments on commit e3efa88

Please sign in to comment.