diff --git a/pkg/cmd/roachtest/tests/awsdms.go b/pkg/cmd/roachtest/tests/awsdms.go index 001bcf602582..b73fbb0e73c1 100644 --- a/pkg/cmd/roachtest/tests/awsdms.go +++ b/pkg/cmd/roachtest/tests/awsdms.go @@ -486,6 +486,27 @@ func setupDMSEndpointsAndTask( return err } *ep.arn = *epOut.Endpoint.EndpointArn + + t.L().Printf("testing replication endpoint %s", *ep.in.EndpointIdentifier) + r := retry.StartWithCtx(ctx, retry.Options{ + InitialBackoff: 30 * time.Second, + MaxBackoff: time.Minute, + MaxRetries: 10, + }) + var lastErr error + for r.Next() { + _, lastErr = dmsCli.TestConnection(ctx, &dms.TestConnectionInput{ + EndpointArn: epOut.Endpoint.EndpointArn, + ReplicationInstanceArn: proto.String(replicationARN), + }) + if lastErr == nil { + break + } + t.L().Printf("replication endpoint test failed, retrying: %s", lastErr) + } + if lastErr != nil { + return lastErr + } } t.L().Printf("creating replication task") diff --git a/pkg/sql/row/inserter.go b/pkg/sql/row/inserter.go index 965e6704b79b..9a120cfd9f69 100644 --- a/pkg/sql/row/inserter.go +++ b/pkg/sql/row/inserter.go @@ -19,7 +19,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/sql/catalog" - "github.com/cockroachdb/cockroach/pkg/sql/rowenc/valueside" "github.com/cockroachdb/cockroach/pkg/sql/rowinfra" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -33,10 +32,9 @@ type Inserter struct { InsertColIDtoRowIndex catalog.TableColMap // For allocation avoidance. - marshaled []roachpb.Value - key roachpb.Key - valueBuf []byte - value roachpb.Value + key roachpb.Key + valueBuf []byte + value roachpb.Value } // MakeInserter creates a Inserter for the given table. @@ -61,7 +59,6 @@ func MakeInserter( InsertCols: insertCols, InsertColIDtoRowIndex: ColIDtoRowIndexFromCols(insertCols), - marshaled: make([]roachpb.Value, len(insertCols)), } for i := 0; i < tableDesc.GetPrimaryIndex().NumKeyColumns(); i++ { @@ -141,20 +138,6 @@ func (ri *Inserter) InsertRow( putFn = insertPutFn } - // Encode the values to the expected column type. This needs to - // happen before index encoding because certain datum types (i.e. tuple) - // cannot be used as index values. - // - // TODO(radu): the legacy marshaling is used only in rare cases; this is - // wasteful. - for i, val := range values { - // Make sure the value can be written to the column before proceeding. - var err error - if ri.marshaled[i], err = valueside.MarshalLegacy(ri.InsertCols[i].GetType(), val); err != nil { - return err - } - } - // We don't want to insert any empty k/v's, so set includeEmpty to false. // Consider the following case: // TABLE t ( @@ -177,7 +160,7 @@ func (ri *Inserter) InsertRow( ri.valueBuf, err = prepareInsertOrUpdateBatch(ctx, b, &ri.Helper, primaryIndexKey, ri.InsertCols, values, ri.InsertColIDtoRowIndex, - ri.marshaled, ri.InsertColIDtoRowIndex, + ri.InsertColIDtoRowIndex, &ri.key, &ri.value, ri.valueBuf, putFn, overwrite, traceKV) if err != nil { return err diff --git a/pkg/sql/row/updater.go b/pkg/sql/row/updater.go index 288463fd1537..f0eefa66721d 100644 --- a/pkg/sql/row/updater.go +++ b/pkg/sql/row/updater.go @@ -22,7 +22,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" - "github.com/cockroachdb/cockroach/pkg/sql/rowenc/valueside" "github.com/cockroachdb/cockroach/pkg/sql/rowinfra" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -48,7 +47,6 @@ type Updater struct { ri Inserter // For allocation avoidance. - marshaled []roachpb.Value newValues []tree.Datum key roachpb.Key valueBuf []byte @@ -175,7 +173,6 @@ func MakeUpdater( UpdateCols: updateCols, UpdateColIDtoRowIndex: updateColIDtoRowIndex, primaryKeyColChange: primaryKeyColChange, - marshaled: make([]roachpb.Value, len(updateCols)), oldIndexEntries: make([][]rowenc.IndexEntry, len(includeIndexes)), newIndexEntries: make([][]rowenc.IndexEntry, len(includeIndexes)), } @@ -242,18 +239,6 @@ func (ru *Updater) UpdateRow( } } - // Check that the new value types match the column types. This needs to - // happen before index encoding because certain datum types (i.e. tuple) - // cannot be used as index values. - // - // TODO(radu): the legacy marshaling is used only in rare cases; this is - // wasteful. - for i, val := range updateValues { - if ru.marshaled[i], err = valueside.MarshalLegacy(ru.UpdateCols[i].GetType(), val); err != nil { - return nil, err - } - } - // Update the row values. copy(ru.newValues, oldValues) for i, updateCol := range ru.UpdateCols { @@ -381,7 +366,7 @@ func (ru *Updater) UpdateRow( ru.valueBuf, err = prepareInsertOrUpdateBatch(ctx, batch, &ru.Helper, primaryIndexKey, ru.FetchCols, ru.newValues, ru.FetchColIDtoRowIndex, - ru.marshaled, ru.UpdateColIDtoRowIndex, + ru.UpdateColIDtoRowIndex, &ru.key, &ru.value, ru.valueBuf, insertPutFn, true /* overwrite */, traceKV) if err != nil { return nil, err diff --git a/pkg/sql/row/writer.go b/pkg/sql/row/writer.go index 796230d950c1..ecacc54fb6b0 100644 --- a/pkg/sql/row/writer.go +++ b/pkg/sql/row/writer.go @@ -75,13 +75,10 @@ func ColMapping(fromCols, toCols []catalog.Column) []int { // - fetchedCols is the list of schema columns that have been fetched // in preparation for this update. // - values is the SQL-level row values that are being written. -// - marshaledValues contains the pre-encoded KV-level row values. -// marshaledValues is only used when writing single column families. -// Regardless of whether there are single column families, -// pre-encoding must occur prior to calling this function to check whether -// the encoding is _possible_ (i.e. values fit in the column types, etc). -// - valColIDMapping/marshaledColIDMapping is the mapping from column -// IDs into positions of the slices values or marshaledValues. +// - valColIDMapping is the mapping from column IDs into positions of the slice +// values. +// - updatedColIDMapping is the mapping from column IDs into the positions of +// the updated values. // - kvKey and kvValues must be heap-allocated scratch buffers to write // roachpb.Key and roachpb.Value values. // - rawValueBuf must be a scratch byte array. This must be reinitialized @@ -97,8 +94,7 @@ func prepareInsertOrUpdateBatch( fetchedCols []catalog.Column, values []tree.Datum, valColIDMapping catalog.TableColMap, - marshaledValues []roachpb.Value, - marshaledColIDMapping catalog.TableColMap, + updatedColIDMapping catalog.TableColMap, kvKey *roachpb.Key, kvValue *roachpb.Value, rawValueBuf []byte, @@ -110,7 +106,7 @@ func prepareInsertOrUpdateBatch( family := &families[i] update := false for _, colID := range family.ColumnIDs { - if _, ok := marshaledColIDMapping.Get(colID); ok { + if _, ok := updatedColIDMapping.Get(colID); ok { update = true break } @@ -139,12 +135,18 @@ func prepareInsertOrUpdateBatch( // Storage optimization to store DefaultColumnID directly as a value. Also // backwards compatible with the original BaseFormatVersion. - idx, ok := marshaledColIDMapping.Get(family.DefaultColumnID) + idx, ok := valColIDMapping.Get(family.DefaultColumnID) if !ok { continue } - if marshaledValues[idx].RawBytes == nil { + typ := fetchedCols[idx].GetType() + marshaled, err := valueside.MarshalLegacy(typ, values[idx]) + if err != nil { + return nil, err + } + + if marshaled.RawBytes == nil { if overwrite { // If the new family contains a NULL value, then we must // delete any pre-existing row. @@ -154,10 +156,10 @@ func prepareInsertOrUpdateBatch( // We only output non-NULL values. Non-existent column keys are // considered NULL during scanning and the row sentinel ensures we know // the row exists. - if err := helper.checkRowSize(ctx, kvKey, &marshaledValues[idx], family.ID); err != nil { + if err := helper.checkRowSize(ctx, kvKey, &marshaled, family.ID); err != nil { return nil, err } - putFn(ctx, batch, kvKey, &marshaledValues[idx], traceKV) + putFn(ctx, batch, kvKey, &marshaled, traceKV) } continue