Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
85986: sql: make copy atomic by default and add optin to old behavior r=cucaroach a=cucaroach

Previously COPY under and implicit transaction would use auto committed
batches of 100 rows. This decision was made years ago when large txn
support wasn't what it is now. Today we handle large transactions better
so make the default bahavior atomic and provide a cluster setting to get
back the old behavior. Its possible in busy clusters with large COPY's
there may be contention and retries that make non-atomic copy desirable.

Fixes: #85887

Release note (sql change): COPY FROM operations are now atomic by
default instead of being segmented into 100 row transactions. Setting
the sql.copyfrom.atomic cluster setting to false to get the old
behavior.

Release justification: low risk high benefit correctness fix to existing
functionality

86189: storage/metamorphic: add TestCompareFiles r=nicktrav a=jbowens

Add a new MVCC metamorphic test entrypoint, TestCompareFiles, that takes a
check file through `--check` and one or more output.metas through
`--compare-files`. The test runs the configurations specified through
`--compare-files` against the specified `--check` file, parsing out the encoded
Pebble options.

Release note: None
Release justification: Non-production code changes

86317: ui: introduce schema insights page on db-console r=THardy98 a=THardy98

Resolves: #83828, [#83829](#83829)

This change introduces the schema insights page to the DB-Console.
Schema insights are fetched from `schemaInsightsApi` using the SQL-over
HTTP API and the corresponding component `schemaInsightsView` is
available from cluster-ui for future use in the CC console. The schema
insights page display a table of schema insights - currently different
types of index recommendations (i.e. drop/create/replace index
recommendations), with the intention to add different types of schema
insights in the future. Each schema insight row offers an actionable
button, offering the user the ability to execute the corresponding SQL
query that realizes their schema insight. Filters are available to
filter by database and the schema insight type, as well as search.

~~Loom: https://www.loom.com/share/29ac973730614968893c179f4974fc61~~
Updated Loom: https://www.loom.com/share/ee36842fa9594c8888523d9ce41e1607
(this demo shows a known bug in the duration formatting on the index details page when the duration is set to <1 hour, #85222)

Release note (ui change): Added new Schema Insights page to DB Console.
The Schema Insights page displays a table of schema insights - currently
different types of index recommendations (i.e. drop/create/replace index
recommendations). Each schema insight row offers the user the ability to
execute the corresponding SQL query that realizes their schema insight
via a clickable button. Filters are available to filter the surfaced
schema insights by database and insight type, as well as search.

Release justification: low risk, high benefit changes to existing
functionality

Co-authored-by: Tommy Reilly <[email protected]>
Co-authored-by: Jackson Owens <[email protected]>
Co-authored-by: Thomas Hardy <[email protected]>
  • Loading branch information
4 people committed Aug 25, 2022
4 parents 6b3cd4b + bef7580 + d290dc5 + 2be29c6 commit 5c0af43
Show file tree
Hide file tree
Showing 56 changed files with 2,238 additions and 178 deletions.
25 changes: 19 additions & 6 deletions pkg/cmd/roachtest/tests/copyfrom.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,10 @@ func initTest(ctx context.Context, t test.Test, c cluster.Cluster, sf int) {
); err != nil {
t.Fatal(err)
}
csv := fmt.Sprintf(tpchLineitemFmt, sf)
c.Run(ctx, c.Node(1), "rm -f /tmp/lineitem-table.csv")
c.Run(ctx, c.Node(1), fmt.Sprintf("curl '%s' -o /tmp/lineitem-table.csv", csv))
}
csv := fmt.Sprintf(tpchLineitemFmt, sf)
c.Run(ctx, c.Node(1), "rm -f /tmp/lineitem-table.csv")
c.Run(ctx, c.Node(1), fmt.Sprintf("curl '%s' -o /tmp/lineitem-table.csv", csv))
}

func runTest(ctx context.Context, t test.Test, c cluster.Cluster, pg string) {
Expand Down Expand Up @@ -121,10 +121,15 @@ func runCopyFromPG(ctx context.Context, t test.Test, c cluster.Cluster, sf int)
runTest(ctx, t, c, "sudo -i -u postgres psql")
}

func runCopyFromCRDB(ctx context.Context, t test.Test, c cluster.Cluster, sf int) {
func runCopyFromCRDB(ctx context.Context, t test.Test, c cluster.Cluster, sf int, atomic bool) {
c.Put(ctx, t.Cockroach(), "./cockroach", c.All())
c.Start(ctx, t.L(), option.DefaultStartOpts(), install.MakeClusterSettings(), c.All())
initTest(ctx, t, c, sf)
db, err := c.ConnE(ctx, t.L(), 1)
require.NoError(t, err)
stmt := fmt.Sprintf("ALTER ROLE ALL SET copy_from_atomic_enabled = %t", atomic)
_, err = db.ExecContext(ctx, stmt)
require.NoError(t, err)
urls, err := c.InternalPGUrl(ctx, t.L(), c.Node(1))
require.NoError(t, err)
m := c.NewMonitor(ctx, c.All())
Expand All @@ -150,11 +155,19 @@ func registerCopyFrom(r registry.Registry) {
for _, tc := range testcases {
tc := tc
r.Add(registry.TestSpec{
Name: fmt.Sprintf("copyfrom/crdb/sf=%d/nodes=%d", tc.sf, tc.nodes),
Name: fmt.Sprintf("copyfrom/crdb-atomic/sf=%d/nodes=%d", tc.sf, tc.nodes),
Owner: registry.OwnerKV,
Cluster: r.MakeClusterSpec(tc.nodes),
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
runCopyFromCRDB(ctx, t, c, tc.sf, true /*atomic*/)
},
})
r.Add(registry.TestSpec{
Name: fmt.Sprintf("copyfrom/crdb-nonatomic/sf=%d/nodes=%d", tc.sf, tc.nodes),
Owner: registry.OwnerKV,
Cluster: r.MakeClusterSpec(tc.nodes),
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
runCopyFromCRDB(ctx, t, c, tc.sf)
runCopyFromCRDB(ctx, t, c, tc.sf, false /*atomic*/)
},
})
r.Add(registry.TestSpec{
Expand Down
25 changes: 1 addition & 24 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2839,31 +2839,8 @@ func (ex *connExecutor) initPlanner(ctx context.Context, p *planner) {
func (ex *connExecutor) resetPlanner(
ctx context.Context, p *planner, txn *kv.Txn, stmtTS time.Time,
) {
p.txn = txn
p.stmt = Statement{}
p.instrumentation = instrumentationHelper{}

p.cancelChecker.Reset(ctx)

p.semaCtx = tree.MakeSemaContext()
p.semaCtx.SearchPath = &ex.sessionData().SearchPath
p.semaCtx.Annotations = nil
p.semaCtx.TypeResolver = p
p.semaCtx.FunctionResolver = p
p.semaCtx.TableNameResolver = p
p.semaCtx.DateStyle = ex.sessionData().GetDateStyle()
p.semaCtx.IntervalStyle = ex.sessionData().GetIntervalStyle()

p.resetPlanner(ctx, txn, stmtTS, ex.sessionData())
ex.resetEvalCtx(&p.extendedEvalCtx, txn, stmtTS)

p.autoCommit = false
p.isPreparing = false

p.schemaResolver.txn = txn
p.schemaResolver.sessionDataStack = p.EvalContext().SessionDataStack
p.evalCatalogBuiltins.Init(p.execCfg.Codec, txn, p.Descriptors())
p.skipDescriptorCache = false
p.typeResolutionDbID = descpb.InvalidID
}

// txnStateTransitionsApplyWrapper is a wrapper on top of Machine built with the
Expand Down
47 changes: 32 additions & 15 deletions pkg/sql/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,14 @@ type copyMachine struct {
// other things that statements more generally need.
parsingEvalCtx *eval.Context

processRows func(ctx context.Context) error
processRows func(ctx context.Context, finalBatch bool) error

scratchRow []tree.Datum

// For testing we want to be able to override this on the instance level.
copyBatchRowSize int

implicitTxn bool
}

// newCopyMachine creates a new copyMachine.
Expand All @@ -147,11 +152,11 @@ func newCopyMachine(
csvExpectHeader: n.Options.Header,
p: p,
execInsertPlan: execInsertPlan,
implicitTxn: txnOpt.txn == nil,
}

// We need a planner to do the initial planning, in addition
// to those used for the main execution of the COPY afterwards.
cleanup := c.p.preparePlannerForCopy(ctx, txnOpt)
cleanup := c.p.preparePlannerForCopy(ctx, &c.txnOpt, false /* finalBatch */, c.implicitTxn)
defer func() {
retErr = cleanup(ctx, retErr)
}()
Expand Down Expand Up @@ -283,6 +288,7 @@ func (c *copyMachine) initMonitoring(ctx context.Context, parentMon *mon.BytesMo
c.copyMon.StartNoReserved(ctx, parentMon)
c.bufMemAcc = c.copyMon.MakeBoundAccount()
c.rowsMemAcc = c.copyMon.MakeBoundAccount()
c.copyBatchRowSize = copyBatchRowSize
}

// copyTxnOpt contains information about the transaction in which the copying
Expand Down Expand Up @@ -456,10 +462,10 @@ func (c *copyMachine) processCopyData(ctx context.Context, data string, final bo
}
}
// Only do work if we have a full batch of rows or this is the end.
if ln := c.rows.Len(); !final && (ln == 0 || ln < copyBatchRowSize) {
if ln := c.rows.Len(); !final && (ln == 0 || ln < c.copyBatchRowSize) {
return nil
}
return c.processRows(ctx)
return c.processRows(ctx, final)
}

func (c *copyMachine) readTextData(ctx context.Context, final bool) (brk bool, err error) {
Expand Down Expand Up @@ -582,8 +588,7 @@ func (c *copyMachine) readCSVTuple(ctx context.Context, record []csv.Record) err

datums[i] = d
}
_, err := c.rows.AddRow(ctx, datums)
if err != nil {
if _, err := c.rows.AddRow(ctx, datums); err != nil {
return err
}
return nil
Expand Down Expand Up @@ -715,22 +720,34 @@ func (c *copyMachine) readBinarySignature() ([]byte, error) {
// an error. If an error is passed in to the cleanup function, the
// same error is returned.
func (p *planner) preparePlannerForCopy(
ctx context.Context, txnOpt copyTxnOpt,
ctx context.Context, txnOpt *copyTxnOpt, finalBatch bool, implicitTxn bool,
) func(context.Context, error) error {
txn := txnOpt.txn
txnTs := txnOpt.txnTimestamp
stmtTs := txnOpt.stmtTimestamp
autoCommit := false
autoCommit := finalBatch && implicitTxn
if txn == nil {
nodeID, _ := p.execCfg.NodeInfo.NodeID.OptionalNodeID()
// The session data stack in the planner is not set up at this point, so use
// the default Normal QoSLevel.
txn = kv.NewTxnWithSteppingEnabled(ctx, p.execCfg.DB, nodeID, sessiondatapb.Normal)
txnTs = p.execCfg.Clock.PhysicalTime()
stmtTs = txnTs
autoCommit = true

}
txnOpt.resetPlanner(ctx, p, txn, txnTs, stmtTs)
if implicitTxn {
// For atomic implicit COPY remember txn for next time so we don't start a new one.
if p.SessionData().CopyFromAtomicEnabled {
txnOpt.txn = txn
txnOpt.txnTimestamp = txnTs
txnOpt.stmtTimestamp = txnTs
autoCommit = finalBatch
} else {
// We're doing original behavior of committing each batch.
autoCommit = true
}
}
p.autoCommit = autoCommit

return func(ctx context.Context, prevErr error) (err error) {
Expand All @@ -754,14 +771,14 @@ func (p *planner) preparePlannerForCopy(
}

// insertRows transforms the buffered rows into an insertNode and executes it.
func (c *copyMachine) insertRows(ctx context.Context) (retErr error) {
if c.rows.Len() == 0 {
return nil
}
cleanup := c.p.preparePlannerForCopy(ctx, c.txnOpt)
func (c *copyMachine) insertRows(ctx context.Context, finalBatch bool) (retErr error) {
cleanup := c.p.preparePlannerForCopy(ctx, &c.txnOpt, finalBatch, c.implicitTxn)
defer func() {
retErr = cleanup(ctx, retErr)
}()
if c.rows.Len() == 0 {
return nil
}
numRows := c.rows.Len()

copyFastPath := c.p.SessionData().CopyFastPathEnabled
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/copy_file_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func newFileUploadMachine(

// We need a planner to do the initial planning, even if a planner
// is not required after that.
cleanup := c.p.preparePlannerForCopy(ctx, txnOpt)
cleanup := c.p.preparePlannerForCopy(ctx, &txnOpt, false /* finalBatch */, c.implicitTxn)
defer func() {
retErr = cleanup(ctx, retErr)
}()
Expand Down Expand Up @@ -191,7 +191,7 @@ func (f *fileUploadMachine) run(ctx context.Context) error {
return err
}

func (f *fileUploadMachine) writeFile(ctx context.Context) error {
func (f *fileUploadMachine) writeFile(ctx context.Context, finalBatch bool) error {
for i := 0; i < f.c.rows.Len(); i++ {
r := f.c.rows.At(i)
b := []byte(*r[0].(*tree.DBytes))
Expand Down
Loading

0 comments on commit 5c0af43

Please sign in to comment.