Skip to content

Commit

Permalink
sql: make copy atomic by default and add optin to old behavior
Browse files Browse the repository at this point in the history
Previously COPY under an implicit transaction would use auto committed
batches of 100 rows. This decision was made years ago when large txn
support wasn't what it is now. Today we handle large transactions better
so make the default behavior atomic and provide a cluster setting to get
back the old behavior. It's possible in busy clusters with large COPY's
there may be contention and retries that make non-atomic copy desirable.

Fixes: #85887

Release note (backward-incompatible change): COPY FROM operations are
now atomic by default instead of being segmented into 100 row
transactions. Set the copy_from_atomic_enabled session setting to
false to get the old behavior.

Release justification: low risk high benefit correctness fix to existing
functionality
  • Loading branch information
cucaroach committed Aug 25, 2022
1 parent 44e50c1 commit bef7580
Show file tree
Hide file tree
Showing 14 changed files with 281 additions and 67 deletions.
25 changes: 19 additions & 6 deletions pkg/cmd/roachtest/tests/copyfrom.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,10 @@ func initTest(ctx context.Context, t test.Test, c cluster.Cluster, sf int) {
); err != nil {
t.Fatal(err)
}
csv := fmt.Sprintf(tpchLineitemFmt, sf)
c.Run(ctx, c.Node(1), "rm -f /tmp/lineitem-table.csv")
c.Run(ctx, c.Node(1), fmt.Sprintf("curl '%s' -o /tmp/lineitem-table.csv", csv))
}
csv := fmt.Sprintf(tpchLineitemFmt, sf)
c.Run(ctx, c.Node(1), "rm -f /tmp/lineitem-table.csv")
c.Run(ctx, c.Node(1), fmt.Sprintf("curl '%s' -o /tmp/lineitem-table.csv", csv))
}

func runTest(ctx context.Context, t test.Test, c cluster.Cluster, pg string) {
Expand Down Expand Up @@ -121,10 +121,15 @@ func runCopyFromPG(ctx context.Context, t test.Test, c cluster.Cluster, sf int)
runTest(ctx, t, c, "sudo -i -u postgres psql")
}

func runCopyFromCRDB(ctx context.Context, t test.Test, c cluster.Cluster, sf int) {
func runCopyFromCRDB(ctx context.Context, t test.Test, c cluster.Cluster, sf int, atomic bool) {
c.Put(ctx, t.Cockroach(), "./cockroach", c.All())
c.Start(ctx, t.L(), option.DefaultStartOpts(), install.MakeClusterSettings(), c.All())
initTest(ctx, t, c, sf)
db, err := c.ConnE(ctx, t.L(), 1)
require.NoError(t, err)
stmt := fmt.Sprintf("ALTER ROLE ALL SET copy_from_atomic_enabled = %t", atomic)
_, err = db.ExecContext(ctx, stmt)
require.NoError(t, err)
urls, err := c.InternalPGUrl(ctx, t.L(), c.Node(1))
require.NoError(t, err)
m := c.NewMonitor(ctx, c.All())
Expand All @@ -150,11 +155,19 @@ func registerCopyFrom(r registry.Registry) {
for _, tc := range testcases {
tc := tc
r.Add(registry.TestSpec{
Name: fmt.Sprintf("copyfrom/crdb/sf=%d/nodes=%d", tc.sf, tc.nodes),
Name: fmt.Sprintf("copyfrom/crdb-atomic/sf=%d/nodes=%d", tc.sf, tc.nodes),
Owner: registry.OwnerKV,
Cluster: r.MakeClusterSpec(tc.nodes),
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
runCopyFromCRDB(ctx, t, c, tc.sf, true /*atomic*/)
},
})
r.Add(registry.TestSpec{
Name: fmt.Sprintf("copyfrom/crdb-nonatomic/sf=%d/nodes=%d", tc.sf, tc.nodes),
Owner: registry.OwnerKV,
Cluster: r.MakeClusterSpec(tc.nodes),
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
runCopyFromCRDB(ctx, t, c, tc.sf)
runCopyFromCRDB(ctx, t, c, tc.sf, false /*atomic*/)
},
})
r.Add(registry.TestSpec{
Expand Down
25 changes: 1 addition & 24 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2839,31 +2839,8 @@ func (ex *connExecutor) initPlanner(ctx context.Context, p *planner) {
func (ex *connExecutor) resetPlanner(
ctx context.Context, p *planner, txn *kv.Txn, stmtTS time.Time,
) {
p.txn = txn
p.stmt = Statement{}
p.instrumentation = instrumentationHelper{}

p.cancelChecker.Reset(ctx)

p.semaCtx = tree.MakeSemaContext()
p.semaCtx.SearchPath = &ex.sessionData().SearchPath
p.semaCtx.Annotations = nil
p.semaCtx.TypeResolver = p
p.semaCtx.FunctionResolver = p
p.semaCtx.TableNameResolver = p
p.semaCtx.DateStyle = ex.sessionData().GetDateStyle()
p.semaCtx.IntervalStyle = ex.sessionData().GetIntervalStyle()

p.resetPlanner(ctx, txn, stmtTS, ex.sessionData())
ex.resetEvalCtx(&p.extendedEvalCtx, txn, stmtTS)

p.autoCommit = false
p.isPreparing = false

p.schemaResolver.txn = txn
p.schemaResolver.sessionDataStack = p.EvalContext().SessionDataStack
p.evalCatalogBuiltins.Init(p.execCfg.Codec, txn, p.Descriptors())
p.skipDescriptorCache = false
p.typeResolutionDbID = descpb.InvalidID
}

// txnStateTransitionsApplyWrapper is a wrapper on top of Machine built with the
Expand Down
47 changes: 32 additions & 15 deletions pkg/sql/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,14 @@ type copyMachine struct {
// other things that statements more generally need.
parsingEvalCtx *eval.Context

processRows func(ctx context.Context) error
processRows func(ctx context.Context, finalBatch bool) error

scratchRow []tree.Datum

// For testing we want to be able to override this on the instance level.
copyBatchRowSize int

implicitTxn bool
}

// newCopyMachine creates a new copyMachine.
Expand All @@ -147,11 +152,11 @@ func newCopyMachine(
csvExpectHeader: n.Options.Header,
p: p,
execInsertPlan: execInsertPlan,
implicitTxn: txnOpt.txn == nil,
}

// We need a planner to do the initial planning, in addition
// to those used for the main execution of the COPY afterwards.
cleanup := c.p.preparePlannerForCopy(ctx, txnOpt)
cleanup := c.p.preparePlannerForCopy(ctx, &c.txnOpt, false /* finalBatch */, c.implicitTxn)
defer func() {
retErr = cleanup(ctx, retErr)
}()
Expand Down Expand Up @@ -283,6 +288,7 @@ func (c *copyMachine) initMonitoring(ctx context.Context, parentMon *mon.BytesMo
c.copyMon.StartNoReserved(ctx, parentMon)
c.bufMemAcc = c.copyMon.MakeBoundAccount()
c.rowsMemAcc = c.copyMon.MakeBoundAccount()
c.copyBatchRowSize = copyBatchRowSize
}

// copyTxnOpt contains information about the transaction in which the copying
Expand Down Expand Up @@ -456,10 +462,10 @@ func (c *copyMachine) processCopyData(ctx context.Context, data string, final bo
}
}
// Only do work if we have a full batch of rows or this is the end.
if ln := c.rows.Len(); !final && (ln == 0 || ln < copyBatchRowSize) {
if ln := c.rows.Len(); !final && (ln == 0 || ln < c.copyBatchRowSize) {
return nil
}
return c.processRows(ctx)
return c.processRows(ctx, final)
}

func (c *copyMachine) readTextData(ctx context.Context, final bool) (brk bool, err error) {
Expand Down Expand Up @@ -582,8 +588,7 @@ func (c *copyMachine) readCSVTuple(ctx context.Context, record []csv.Record) err

datums[i] = d
}
_, err := c.rows.AddRow(ctx, datums)
if err != nil {
if _, err := c.rows.AddRow(ctx, datums); err != nil {
return err
}
return nil
Expand Down Expand Up @@ -715,22 +720,34 @@ func (c *copyMachine) readBinarySignature() ([]byte, error) {
// an error. If an error is passed in to the cleanup function, the
// same error is returned.
func (p *planner) preparePlannerForCopy(
ctx context.Context, txnOpt copyTxnOpt,
ctx context.Context, txnOpt *copyTxnOpt, finalBatch bool, implicitTxn bool,
) func(context.Context, error) error {
txn := txnOpt.txn
txnTs := txnOpt.txnTimestamp
stmtTs := txnOpt.stmtTimestamp
autoCommit := false
autoCommit := finalBatch && implicitTxn
if txn == nil {
nodeID, _ := p.execCfg.NodeInfo.NodeID.OptionalNodeID()
// The session data stack in the planner is not set up at this point, so use
// the default Normal QoSLevel.
txn = kv.NewTxnWithSteppingEnabled(ctx, p.execCfg.DB, nodeID, sessiondatapb.Normal)
txnTs = p.execCfg.Clock.PhysicalTime()
stmtTs = txnTs
autoCommit = true

}
txnOpt.resetPlanner(ctx, p, txn, txnTs, stmtTs)
if implicitTxn {
// For atomic implicit COPY remember txn for next time so we don't start a new one.
if p.SessionData().CopyFromAtomicEnabled {
txnOpt.txn = txn
txnOpt.txnTimestamp = txnTs
txnOpt.stmtTimestamp = txnTs
autoCommit = finalBatch
} else {
// We're doing original behavior of committing each batch.
autoCommit = true
}
}
p.autoCommit = autoCommit

return func(ctx context.Context, prevErr error) (err error) {
Expand All @@ -754,14 +771,14 @@ func (p *planner) preparePlannerForCopy(
}

// insertRows transforms the buffered rows into an insertNode and executes it.
func (c *copyMachine) insertRows(ctx context.Context) (retErr error) {
if c.rows.Len() == 0 {
return nil
}
cleanup := c.p.preparePlannerForCopy(ctx, c.txnOpt)
func (c *copyMachine) insertRows(ctx context.Context, finalBatch bool) (retErr error) {
cleanup := c.p.preparePlannerForCopy(ctx, &c.txnOpt, finalBatch, c.implicitTxn)
defer func() {
retErr = cleanup(ctx, retErr)
}()
if c.rows.Len() == 0 {
return nil
}
numRows := c.rows.Len()

copyFastPath := c.p.SessionData().CopyFastPathEnabled
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/copy_file_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func newFileUploadMachine(

// We need a planner to do the initial planning, even if a planner
// is not required after that.
cleanup := c.p.preparePlannerForCopy(ctx, txnOpt)
cleanup := c.p.preparePlannerForCopy(ctx, &txnOpt, false /* finalBatch */, c.implicitTxn)
defer func() {
retErr = cleanup(ctx, retErr)
}()
Expand Down Expand Up @@ -191,7 +191,7 @@ func (f *fileUploadMachine) run(ctx context.Context) error {
return err
}

func (f *fileUploadMachine) writeFile(ctx context.Context) error {
func (f *fileUploadMachine) writeFile(ctx context.Context, finalBatch bool) error {
for i := 0; i < f.c.rows.Len(); i++ {
r := f.c.rows.At(i)
b := []byte(*r[0].(*tree.DBytes))
Expand Down
Loading

0 comments on commit bef7580

Please sign in to comment.