Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
81596: row: remove unnecessary mutation value marshalling r=yuzefovich a=jordanlewis

Closes cockroachdb#74741

This commit removes a little bit of unnecessary extra marshalling work
during the INSERT/UPDATE operations. Previously, we had to always
marshal all columns with the old encoding for now particularly good
reason. Now, we only run that old encoding marshalling when we need it:
aka, when there is a non-zero column family with just a single column in
it.

Release note: None

82182: awsdms: test endpoint before creating the replication task r=rafiss a=otan

The endpoint connection must successfully have been tested before the
replication task can be created. This usually happens before the
replication task is created, but sometimes it may not.

Avoid this race by proactively testing the connection and only start the
replication task after the endpoint connection test was successful.

Resolves cockroachdb#82057

Release note: None

Co-authored-by: Jordan Lewis <[email protected]>
Co-authored-by: Oliver Tan <[email protected]>
  • Loading branch information
3 people committed Jun 1, 2022
3 parents 1a707a1 + 30e5318 + 091de23 commit 5a54758
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 51 deletions.
21 changes: 21 additions & 0 deletions pkg/cmd/roachtest/tests/awsdms.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,27 @@ func setupDMSEndpointsAndTask(
return err
}
*ep.arn = *epOut.Endpoint.EndpointArn

t.L().Printf("testing replication endpoint %s", *ep.in.EndpointIdentifier)
r := retry.StartWithCtx(ctx, retry.Options{
InitialBackoff: 30 * time.Second,
MaxBackoff: time.Minute,
MaxRetries: 10,
})
var lastErr error
for r.Next() {
_, lastErr = dmsCli.TestConnection(ctx, &dms.TestConnectionInput{
EndpointArn: epOut.Endpoint.EndpointArn,
ReplicationInstanceArn: proto.String(replicationARN),
})
if lastErr == nil {
break
}
t.L().Printf("replication endpoint test failed, retrying: %s", lastErr)
}
if lastErr != nil {
return lastErr
}
}

t.L().Printf("creating replication task")
Expand Down
25 changes: 4 additions & 21 deletions pkg/sql/row/inserter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc/valueside"
"github.com/cockroachdb/cockroach/pkg/sql/rowinfra"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand All @@ -33,10 +32,9 @@ type Inserter struct {
InsertColIDtoRowIndex catalog.TableColMap

// For allocation avoidance.
marshaled []roachpb.Value
key roachpb.Key
valueBuf []byte
value roachpb.Value
key roachpb.Key
valueBuf []byte
value roachpb.Value
}

// MakeInserter creates a Inserter for the given table.
Expand All @@ -61,7 +59,6 @@ func MakeInserter(

InsertCols: insertCols,
InsertColIDtoRowIndex: ColIDtoRowIndexFromCols(insertCols),
marshaled: make([]roachpb.Value, len(insertCols)),
}

for i := 0; i < tableDesc.GetPrimaryIndex().NumKeyColumns(); i++ {
Expand Down Expand Up @@ -141,20 +138,6 @@ func (ri *Inserter) InsertRow(
putFn = insertPutFn
}

// Encode the values to the expected column type. This needs to
// happen before index encoding because certain datum types (i.e. tuple)
// cannot be used as index values.
//
// TODO(radu): the legacy marshaling is used only in rare cases; this is
// wasteful.
for i, val := range values {
// Make sure the value can be written to the column before proceeding.
var err error
if ri.marshaled[i], err = valueside.MarshalLegacy(ri.InsertCols[i].GetType(), val); err != nil {
return err
}
}

// We don't want to insert any empty k/v's, so set includeEmpty to false.
// Consider the following case:
// TABLE t (
Expand All @@ -177,7 +160,7 @@ func (ri *Inserter) InsertRow(
ri.valueBuf, err = prepareInsertOrUpdateBatch(ctx, b,
&ri.Helper, primaryIndexKey, ri.InsertCols,
values, ri.InsertColIDtoRowIndex,
ri.marshaled, ri.InsertColIDtoRowIndex,
ri.InsertColIDtoRowIndex,
&ri.key, &ri.value, ri.valueBuf, putFn, overwrite, traceKV)
if err != nil {
return err
Expand Down
17 changes: 1 addition & 16 deletions pkg/sql/row/updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc/valueside"
"github.com/cockroachdb/cockroach/pkg/sql/rowinfra"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand All @@ -48,7 +47,6 @@ type Updater struct {
ri Inserter

// For allocation avoidance.
marshaled []roachpb.Value
newValues []tree.Datum
key roachpb.Key
valueBuf []byte
Expand Down Expand Up @@ -175,7 +173,6 @@ func MakeUpdater(
UpdateCols: updateCols,
UpdateColIDtoRowIndex: updateColIDtoRowIndex,
primaryKeyColChange: primaryKeyColChange,
marshaled: make([]roachpb.Value, len(updateCols)),
oldIndexEntries: make([][]rowenc.IndexEntry, len(includeIndexes)),
newIndexEntries: make([][]rowenc.IndexEntry, len(includeIndexes)),
}
Expand Down Expand Up @@ -242,18 +239,6 @@ func (ru *Updater) UpdateRow(
}
}

// Check that the new value types match the column types. This needs to
// happen before index encoding because certain datum types (i.e. tuple)
// cannot be used as index values.
//
// TODO(radu): the legacy marshaling is used only in rare cases; this is
// wasteful.
for i, val := range updateValues {
if ru.marshaled[i], err = valueside.MarshalLegacy(ru.UpdateCols[i].GetType(), val); err != nil {
return nil, err
}
}

// Update the row values.
copy(ru.newValues, oldValues)
for i, updateCol := range ru.UpdateCols {
Expand Down Expand Up @@ -381,7 +366,7 @@ func (ru *Updater) UpdateRow(
ru.valueBuf, err = prepareInsertOrUpdateBatch(ctx, batch,
&ru.Helper, primaryIndexKey, ru.FetchCols,
ru.newValues, ru.FetchColIDtoRowIndex,
ru.marshaled, ru.UpdateColIDtoRowIndex,
ru.UpdateColIDtoRowIndex,
&ru.key, &ru.value, ru.valueBuf, insertPutFn, true /* overwrite */, traceKV)
if err != nil {
return nil, err
Expand Down
30 changes: 16 additions & 14 deletions pkg/sql/row/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,10 @@ func ColMapping(fromCols, toCols []catalog.Column) []int {
// - fetchedCols is the list of schema columns that have been fetched
// in preparation for this update.
// - values is the SQL-level row values that are being written.
// - marshaledValues contains the pre-encoded KV-level row values.
// marshaledValues is only used when writing single column families.
// Regardless of whether there are single column families,
// pre-encoding must occur prior to calling this function to check whether
// the encoding is _possible_ (i.e. values fit in the column types, etc).
// - valColIDMapping/marshaledColIDMapping is the mapping from column
// IDs into positions of the slices values or marshaledValues.
// - valColIDMapping is the mapping from column IDs into positions of the slice
// values.
// - updatedColIDMapping is the mapping from column IDs into the positions of
// the updated values.
// - kvKey and kvValues must be heap-allocated scratch buffers to write
// roachpb.Key and roachpb.Value values.
// - rawValueBuf must be a scratch byte array. This must be reinitialized
Expand All @@ -97,8 +94,7 @@ func prepareInsertOrUpdateBatch(
fetchedCols []catalog.Column,
values []tree.Datum,
valColIDMapping catalog.TableColMap,
marshaledValues []roachpb.Value,
marshaledColIDMapping catalog.TableColMap,
updatedColIDMapping catalog.TableColMap,
kvKey *roachpb.Key,
kvValue *roachpb.Value,
rawValueBuf []byte,
Expand All @@ -110,7 +106,7 @@ func prepareInsertOrUpdateBatch(
family := &families[i]
update := false
for _, colID := range family.ColumnIDs {
if _, ok := marshaledColIDMapping.Get(colID); ok {
if _, ok := updatedColIDMapping.Get(colID); ok {
update = true
break
}
Expand Down Expand Up @@ -139,12 +135,18 @@ func prepareInsertOrUpdateBatch(
// Storage optimization to store DefaultColumnID directly as a value. Also
// backwards compatible with the original BaseFormatVersion.

idx, ok := marshaledColIDMapping.Get(family.DefaultColumnID)
idx, ok := valColIDMapping.Get(family.DefaultColumnID)
if !ok {
continue
}

if marshaledValues[idx].RawBytes == nil {
typ := fetchedCols[idx].GetType()
marshaled, err := valueside.MarshalLegacy(typ, values[idx])
if err != nil {
return nil, err
}

if marshaled.RawBytes == nil {
if overwrite {
// If the new family contains a NULL value, then we must
// delete any pre-existing row.
Expand All @@ -154,10 +156,10 @@ func prepareInsertOrUpdateBatch(
// We only output non-NULL values. Non-existent column keys are
// considered NULL during scanning and the row sentinel ensures we know
// the row exists.
if err := helper.checkRowSize(ctx, kvKey, &marshaledValues[idx], family.ID); err != nil {
if err := helper.checkRowSize(ctx, kvKey, &marshaled, family.ID); err != nil {
return nil, err
}
putFn(ctx, batch, kvKey, &marshaledValues[idx], traceKV)
putFn(ctx, batch, kvKey, &marshaled, traceKV)
}

continue
Expand Down

0 comments on commit 5a54758

Please sign in to comment.