Skip to content

Commit

Permalink
copy: fix vectorized copy handling of column families
Browse files Browse the repository at this point in the history
Before handing off KV batches to the KV layer we sort them but when
multiple column families are in use this sorting garbles the kys making
reusing it for the prefix keys across families invalid. Fix by saving
a copy of the keys when finishing the first family before sorting.

In order to test this improve the copy-from-kvtrace command to sort
the KVs so we can get consistent results from row vs. vector.

Fixes: #103220

Release note (bug fix): COPY in 23.1.0 and beta versions would
incorrectly encode data with multiple column families. The data must be
dropped and re-imported to be encoded correctly.
  • Loading branch information
cucaroach authored and celiala committed May 15, 2023
1 parent 358e0d8 commit 46ec47c
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 5 deletions.
25 changes: 23 additions & 2 deletions pkg/sql/colenc/encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ type BatchEncoder struct {
// Slice of keys we can reuse across each call to Prepare and between each
// column family.
keys []roachpb.Key
// Slice of keys prefixes so we don't have to re-encode PK for each family.
savedPrefixes []roachpb.Key
// Slice of value we can reuse across each call to Prepare and between each
// column family.
values [][]byte
Expand Down Expand Up @@ -205,6 +207,8 @@ func (b *BatchEncoder) resetBuffers() {
b.extraKeys[row] = b.extraKeys[row][:0]
b.lastColIDs[row] = 0
}

b.savedPrefixes = nil
}

func intMax(a, b int) int {
Expand Down Expand Up @@ -389,6 +393,14 @@ func (b *BatchEncoder) encodePK(ctx context.Context, ind catalog.Index) error {
}
}

// If we have more than one family we have to copy the keys in order to
// re-use their prefixes because the putter routines will sort
// and mutate the kys slice.
if len(families) > 1 && b.savedPrefixes == nil {
b.savedPrefixes = make([]roachpb.Key, len(kys))
copy(b.savedPrefixes, kys)
}

// TODO(cucaroach): For updates overwrite makes this a plain put.
b.p.CPutTuplesEmpty(kys, values)

Expand Down Expand Up @@ -537,6 +549,15 @@ func (b *BatchEncoder) encodeSecondaryIndexWithFamilies(
continue
}
}

// If we have more than one family we have to copy the keys in order to
// re-use their prefixes because the putter routines will sort
// and mutate the kys slice.
if len(familyIDs) > 1 && b.savedPrefixes == nil {
b.savedPrefixes = make([]roachpb.Key, len(kys))
copy(b.savedPrefixes, kys)
}

// If we are looking at family 0, encode the data as BYTES, as it might
// include encoded primary key columns. For other families,
// use the tuple encoding for the value.
Expand Down Expand Up @@ -651,8 +672,8 @@ func (b *BatchEncoder) initFamily(familyIndex, familyID int) {
continue
}
offset := row * b.keyBufSize
// Save old slice.
prefix := kys[row][:b.keyPrefixOffsets[row]]
// Get a slice pointing to prefix bytes.
prefix := b.savedPrefixes[row][:b.keyPrefixOffsets[row]]
// Set slice to new space.
kys[row] = keyBuf[offset : offset : b.keyBufSize+offset]
// Append prefix.
Expand Down
21 changes: 21 additions & 0 deletions pkg/sql/colenc/encode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,27 @@ func TestColFamDropPKNot(t *testing.T) {
checkEqual(t, kvs1, kvs2)
}

func TestColFamilies(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()
s, db, kvdb := serverutils.StartServer(t, testArgs)
defer s.Stopper().Stop(ctx)
r := sqlutils.MakeSQLRunner(db)
r.Exec(t, "CREATE TABLE t (id INT PRIMARY KEY, c1 INT NOT NULL, c2 INT NOT NULL, FAMILY cf1 (id, c1), FAMILY cf2(c2))")
sv := &s.ClusterSettings().SV
desc := desctestutils.TestingGetTableDescriptor(
kvdb, keys.SystemSQLCodec, "defaultdb", "public", "t")

row1 := []tree.Datum{tree.NewDInt(2), tree.NewDInt(1), tree.NewDInt(2)}
row2 := []tree.Datum{tree.NewDInt(1), tree.NewDInt(2), tree.NewDInt(1)}
kvs1, err1 := buildRowKVs([]tree.Datums{row1, row2}, desc, desc.PublicColumns(), sv)
require.NoError(t, err1)
kvs2, err2 := buildVecKVs([]tree.Datums{row1, row2}, desc, desc.PublicColumns(), sv)
require.NoError(t, err2)
checkEqual(t, kvs1, kvs2)
}

// TestColIDToRowIndexNull tests case where insert cols is subset of public columns.
func TestColIDToRowIndexNull(t *testing.T) {
defer leaktest.AfterTest(t)()
Expand Down
8 changes: 5 additions & 3 deletions pkg/sql/copy/copy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"math/rand"
"net/url"
"regexp"
"sort"
"strconv"
"strings"
"testing"
Expand Down Expand Up @@ -154,16 +155,17 @@ func TestDataDriven(t *testing.T) {
}()
require.NoError(t, err)
vals := make([]driver.Value, 1)
var results strings.Builder
var results []string
for err = nil; err == nil; {
err = rows.Next(vals)
if err == io.EOF {
break
}
require.NoError(t, err)
results.WriteString(fmt.Sprintf("%v\n", vals[0]))
results = append(results, fmt.Sprintf("%v", vals[0]))
}
return results.String()
sort.Strings(results)
return strings.Join(results, "\n")
}
case "copy-to", "copy-to-error":
var buf bytes.Buffer
Expand Down
52 changes: 52 additions & 0 deletions pkg/sql/copy/testdata/copy_from
Original file line number Diff line number Diff line change
Expand Up @@ -660,3 +660,55 @@ COPY tcomp FROM STDIN
1 (1, 2)
----
CPut /Table/<>/1/1/0 -> /TUPLE/

# Regression test for #103220
exec-ddl
CREATE TABLE tfam (id INT PRIMARY KEY, c1 INT NOT NULL, c2 INT NOT NULL, FAMILY cf1 (id, c1), FAMILY cf2(c2))
----

copy-from-kvtrace
COPY tfam FROM STDIN QUOTE '"' CSV
2,1,2
1,2,1
----
CPut /Table/<>/1/1/0 -> /TUPLE/2:2:Int/2
CPut /Table/<>/1/1/1/1 -> /INT/1
CPut /Table/<>/1/2/0 -> /TUPLE/2:2:Int/1
CPut /Table/<>/1/2/1/1 -> /INT/2

query
SELECT * FROM tfam
----
1|2|1
2|1|2


exec-ddl
CREATE TABLE tfam2 (id INT PRIMARY KEY, c1 INT NOT NULL, c2 INT NOT NULL, c3 INT NOT NULL, FAMILY cf1 (id, c1), FAMILY cf2(c2), FAMILY cf3(c3), INDEX(c2,c1,c3))
----

copy-from-kvtrace
COPY tfam2 FROM STDIN QUOTE '"' CSV
2,1,2,3
1,2,1,4
3,5,2,1
----
CPut /Table/<>/1/1/0 -> /TUPLE/2:2:Int/2
CPut /Table/<>/1/1/1/1 -> /INT/1
CPut /Table/<>/1/1/2/1 -> /INT/4
CPut /Table/<>/1/2/0 -> /TUPLE/2:2:Int/1
CPut /Table/<>/1/2/1/1 -> /INT/2
CPut /Table/<>/1/2/2/1 -> /INT/3
CPut /Table/<>/1/3/0 -> /TUPLE/2:2:Int/5
CPut /Table/<>/1/3/1/1 -> /INT/2
CPut /Table/<>/1/3/2/1 -> /INT/1
InitPut /Table/<>/2/1/2/4/1/0 -> /BYTES/
InitPut /Table/<>/2/2/1/3/2/0 -> /BYTES/
InitPut /Table/<>/2/2/5/1/3/0 -> /BYTES/

query
SELECT * FROM tfam2
----
1|2|1|4
2|1|2|3
3|5|2|1

0 comments on commit 46ec47c

Please sign in to comment.