Skip to content

Commit

Permalink
copy: replay atomic copies if we encounter retriable error
Browse files Browse the repository at this point in the history
Previously any errors encountered during atomic copies would just be
passed back to the client which could be very disruptive to large
migrations. Now we buffer up to sql.copy.retry.max_size bytes and
if an error is encountered we replay the COPY from the beginning with
a new transaction.

Release note (copy change): Atomic COPY commands can now be retried
increasing the rate of success of DMS operations especially when atomic
COPY behavior is required.
Fixes: cockroachdb#99327
Epic: CRDB-25321
  • Loading branch information
cucaroach committed Jun 13, 2023
1 parent 8ee3b6c commit e051ef2
Show file tree
Hide file tree
Showing 6 changed files with 333 additions and 69 deletions.
1 change: 1 addition & 0 deletions docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ sql.closed_session_cache.time_to_live integer 3600 the maximum time to live, in
sql.contention.event_store.capacity byte size 64 MiB the in-memory storage capacity per-node of contention event store tenant-rw
sql.contention.event_store.duration_threshold duration 0s minimum contention duration to cause the contention events to be collected into crdb_internal.transaction_contention_events tenant-rw
sql.contention.txn_id_cache.max_size byte size 64 MiB the maximum byte size TxnID cache will use (set to 0 to disable) tenant-rw
sql.copy.retry.max_size byte size 128 MiB set to non-zero to enable automatic copy retry tenant-rw
sql.cross_db_fks.enabled boolean false if true, creating foreign key references across databases is allowed tenant-rw
sql.cross_db_sequence_owners.enabled boolean false if true, creating sequences owned by tables from other databases is allowed tenant-rw
sql.cross_db_sequence_references.enabled boolean false if true, sequences referenced by tables from other databases are allowed tenant-rw
Expand Down
1 change: 1 addition & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@
<tr><td><div id="setting-sql-contention-event-store-capacity" class="anchored"><code>sql.contention.event_store.capacity</code></div></td><td>byte size</td><td><code>64 MiB</code></td><td>the in-memory storage capacity per-node of contention event store</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-sql-contention-event-store-duration-threshold" class="anchored"><code>sql.contention.event_store.duration_threshold</code></div></td><td>duration</td><td><code>0s</code></td><td>minimum contention duration to cause the contention events to be collected into crdb_internal.transaction_contention_events</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-sql-contention-txn-id-cache-max-size" class="anchored"><code>sql.contention.txn_id_cache.max_size</code></div></td><td>byte size</td><td><code>64 MiB</code></td><td>the maximum byte size TxnID cache will use (set to 0 to disable)</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-sql-copy-retry-max-size" class="anchored"><code>sql.copy.retry.max_size</code></div></td><td>byte size</td><td><code>128 MiB</code></td><td>set to non-zero to enable automatic copy retry</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-sql-cross-db-fks-enabled" class="anchored"><code>sql.cross_db_fks.enabled</code></div></td><td>boolean</td><td><code>false</code></td><td>if true, creating foreign key references across databases is allowed</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-sql-cross-db-sequence-owners-enabled" class="anchored"><code>sql.cross_db_sequence_owners.enabled</code></div></td><td>boolean</td><td><code>false</code></td><td>if true, creating sequences owned by tables from other databases is allowed</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-sql-cross-db-sequence-references-enabled" class="anchored"><code>sql.cross_db_sequence_references.enabled</code></div></td><td>boolean</td><td><code>false</code></td><td>if true, sequences referenced by tables from other databases are allowed</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
Expand Down
5 changes: 4 additions & 1 deletion pkg/sql/copy/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@ go_test(

go_library(
name = "copy",
srcs = ["reader.go"],
srcs = [
"buffer.go",
"reader.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/sql/copy",
visibility = ["//visibility:public"],
deps = [
Expand Down
88 changes: 88 additions & 0 deletions pkg/sql/copy/buffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright 2023 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package copy

import (
"bytes"
"io"
)

const chunkSize = 64 << 10

type BufferingWriter struct {
// Limit at which we'll stop buffering.
Limit int64
Grow func(int) error
// All the full buffers we've created.
buffers []io.Reader
// The buffer we're currently writing to
buf []byte
}

// Returns an io.Reader to all the bytes that have been written.
func (b *BufferingWriter) GetReader() io.Reader {
if len(b.buf) > 0 {
b.buffers = append(b.buffers, bytes.NewReader(b.buf))
}
return io.MultiReader(b.buffers...)
}

// Returns the amount of memory the buffer is using.
func (b *BufferingWriter) Cap() int {
return chunkSize*len(b.buffers) + len(b.buf)
}

func (b *BufferingWriter) grow() {
if len(b.buf) > 0 {
b.buffers = append(b.buffers, bytes.NewReader(b.buf))
}
if err := b.Grow(chunkSize); err != nil {
// We're done, use Limit zero value to turn things off.
*b = BufferingWriter{}
} else {
b.buf = make([]byte, 0, chunkSize)
}
}

// Write implements the io.Writer interface.
func (b *BufferingWriter) Write(p []byte) (n int, err error) {
// When we cross limit just drop everything.
if int64(b.Cap()) >= b.Limit {
*b = BufferingWriter{}
return len(p), nil
}
if cap(b.buf)-len(b.buf) == 0 {
b.grow()
}
buf := b.buf
towrite := p
off := len(buf)
room := cap(buf) - off
if room < len(p) {
towrite = p[:room]
p = p[room:]
} else {
p = nil
}
n = copy(buf[off:off+room], towrite)
if n != len(towrite) {
panic(false)
}
b.buf = buf[:off+n]
if len(p) > 0 {
e, err := b.Write(p)
if err != nil {
return 0, err
}
n += e
}
return n, nil
}
77 changes: 65 additions & 12 deletions pkg/sql/copy/copy_in_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,19 +429,22 @@ func TestCopyFromRetries(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

const numRows = sql.CopyBatchRowSizeDefault * 5
const numRowsDefault = sql.CopyBatchRowSizeDefault * 5

testCases := []struct {
desc string
hook func(attemptNum int) error
atomicEnabled bool
retriesEnabled bool
inTxn bool
expectedRows int
expectedErr bool
desc string
hook func(attemptNum int) error
atomicEnabled bool
retriesEnabled bool
inTxn bool
fastPathDisabled bool
expectedRows int
expectedErr bool
atomicCopyRetrySize int
numRows int
}{
{
desc: "failure in atomic transaction does not retry",
desc: "failure in small atomic copy does retry",
atomicEnabled: true,
retriesEnabled: true,
hook: func(attemptNum int) error {
Expand All @@ -450,7 +453,46 @@ func TestCopyFromRetries(t *testing.T) {
}
return nil
},
expectedErr: true,
expectedRows: numRowsDefault,
},
{
desc: "failure in small atomic copy does retry no fastPath",
atomicEnabled: true,
retriesEnabled: true,
fastPathDisabled: true,
hook: func(attemptNum int) error {
if attemptNum == 1 {
return &kvpb.TransactionRetryWithProtoRefreshError{}
}
return nil
},
expectedRows: numRowsDefault,
},
{
desc: "failure in too big atomic copy does not retry small cap",
atomicEnabled: true,
retriesEnabled: true,
hook: func(attemptNum int) error {
if attemptNum == 1 {
return &kvpb.TransactionRetryWithProtoRefreshError{}
}
return nil
},
expectedErr: true,
atomicCopyRetrySize: 10,
},
{
desc: "succeed in too big atomic copy does retry large cap",
atomicEnabled: true,
retriesEnabled: true,
hook: func(attemptNum int) error {
if attemptNum == 100 {
return &kvpb.TransactionRetryWithProtoRefreshError{}
}
return nil
},
numRows: 30 << 10,
expectedRows: 30 << 10,
},
{
desc: "does not attempt to retry if disabled",
Expand Down Expand Up @@ -487,7 +529,7 @@ func TestCopyFromRetries(t *testing.T) {
}
return nil
},
expectedRows: numRows,
expectedRows: numRowsDefault,
},
{
desc: "eventually dies on too many restarts",
Expand Down Expand Up @@ -522,6 +564,11 @@ func TestCopyFromRetries(t *testing.T) {

ctx := context.Background()

if tc.atomicCopyRetrySize != 0 {
_, err := db.Exec(fmt.Sprintf("SET CLUSTER SETTING sql.copy.retry.max_size = '%d'", tc.atomicCopyRetrySize))
require.NoError(t, err)
}

// Use pgx instead of lib/pq as pgx doesn't require copy to be in a txn.
pgURL, cleanupGoDB := sqlutils.PGUrl(
t, s.ServingSQLAddr(), "StartServer" /* prefix */, url.User(username.RootUser))
Expand All @@ -532,9 +579,15 @@ func TestCopyFromRetries(t *testing.T) {
require.NoError(t, err)
_, err = pgxConn.Exec(ctx, "SET copy_from_retries_enabled = $1", fmt.Sprintf("%t", tc.retriesEnabled))
require.NoError(t, err)
_, err = pgxConn.Exec(ctx, "SET copy_fast_path_enabled = $1", fmt.Sprintf("%t", !tc.fastPathDisabled))
require.NoError(t, err)

if err := func() error {
var rows [][]interface{}
numRows := numRowsDefault
if tc.numRows != 0 {
numRows = tc.numRows
}
for i := 0; i < numRows; i++ {
rows = append(rows, []interface{}{i})
}
Expand All @@ -556,7 +609,7 @@ func TestCopyFromRetries(t *testing.T) {
}
return txn.Commit(ctx)
}(); err != nil {
assert.True(t, tc.expectedErr, "got error %+v", err)
require.ErrorContains(t, err, "TransactionRetryWithProtoRefreshError")
} else {
assert.False(t, tc.expectedErr, "expected error but got none")
}
Expand Down
Loading

0 comments on commit e051ef2

Please sign in to comment.