Skip to content

Commit

Permalink
protectedts: fix panic when fetching protectedts records
Browse files Browse the repository at this point in the history
Previously, we incorrectly assumed that all records in the
`system.protectedts_records` table would have a non NULL target
column. While this is enforced for all protected timestamp
records written in a 22.1+ cluster, this is not true for records
that might have been prior to the 22.1 cluster version being
finalized.

This change makes the method responsible for reading protectedts
records more defensive when encountering a NULL target column.

Fixes: #79684

Release note: None
  • Loading branch information
adityamaru committed Apr 11, 2022
1 parent 5c1ae72 commit ba597f7
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 1 deletion.
3 changes: 3 additions & 0 deletions pkg/kv/kvserver/protectedts/ptstorage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ go_test(
shard_count = 16,
deps = [
"//pkg/base",
"//pkg/clusterversion",
"//pkg/jobs/jobsprotectedts",
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/kvserver/protectedts",
Expand All @@ -62,6 +64,7 @@ go_test(
"//pkg/sql/tests",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util/hlc",
"//pkg/util/log",
Expand Down
8 changes: 7 additions & 1 deletion pkg/kv/kvserver/protectedts/ptstorage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,13 @@ func rowToRecord(

if !useDeprecatedProtectedTSStorage(ctx, st, knobs) {
target := &ptpb.Target{}
if err := protoutil.Unmarshal([]byte(*row[6].(*tree.DBytes)), target); err != nil {
targetDBytes, ok := row[6].(*tree.DBytes)
if !ok {
// We are reading a pre-22.1 protected timestamp record that has a NULL
// target column, so there is nothing more to do.
return nil
}
if err := protoutil.Unmarshal([]byte(*targetDBytes), target); err != nil {
return errors.Wrapf(err, "failed to unmarshal target for %v", r.ID)
}
r.Target = target
Expand Down
66 changes: 66 additions & 0 deletions pkg/kv/kvserver/protectedts/ptstorage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,20 @@ import (
"sort"
"strconv"
"testing"
"time"
"unsafe"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/jobs/jobsprotectedts"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptstorage"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
Expand All @@ -41,6 +45,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/tests"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -803,6 +808,67 @@ func TestErrorsFromSQL(t *testing.T) {
}), "failed to read records: boom")
}

// TestNullTargetColumnResolution is a regression test that ensures that
// pre-22.1 protected timestamp records that will have a NULL target column are
// still resolved correctly by the protectedts provider. Previously, this would
// panic due to an incorrect assumption that the column would be non-nullable.
func TestNullTargetColumnResolution(t *testing.T) {
ctx := context.Background()
params, _ := tests.CreateTestServerParams()

disableUpgradeCh := make(chan struct{})
params.Knobs = base.TestingKnobs{
Server: &server.TestingKnobs{
BinaryVersionOverride: clusterversion.ByKey(
clusterversion.AlterSystemProtectedTimestampAddColumn - 1),
DisableAutomaticVersionUpgrade: disableUpgradeCh,
},
}

tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ServerArgs: params})
defer tc.Stopper().Stop(ctx)

s := tc.Server(0)
execCfg := s.ExecutorConfig().(sql.ExecutorConfig)
ptp := execCfg.ProtectedTimestampProvider
sqlDB := sqlutils.MakeSQLRunner(tc.Conns[0])

// Write an old style protected timestamp record with a null target column.
ts := hlc.Timestamp{WallTime: time.Now().UnixNano()}
recordID := uuid.MakeV4()
rec := jobsprotectedts.MakeRecord(recordID, int64(1), ts, []roachpb.Span{keys.EverythingSpan},
jobsprotectedts.Jobs, nil /* target */)
require.NoError(t, execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
return ptp.Protect(ctx, txn, rec)
}))

// Close the channel so that the cluster version is upgraded.
close(disableUpgradeCh)
// Check the cluster version is bumped to newVersion.
testutils.SucceedsSoon(t, func() error {
var version string
sqlDB.QueryRow(t, "SELECT value FROM system.settings WHERE name = 'version'").Scan(&version)
var v clusterversion.ClusterVersion
if err := protoutil.Unmarshal([]byte(version), &v); err != nil {
return err
}
version = v.String()
if version != clusterversion.TestingBinaryVersion.String() {
return errors.Errorf("cluster version is still %s, should be %s", version, clusterversion.TestingBinaryVersion.String())
}
return nil
})

// Check the record we wrote above is correctly unmarshalled.
require.NoError(t, execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
state, err := ptp.GetState(ctx, txn)
require.NoError(t, err)

require.Len(t, state.Records, 1)
return nil
}))
}

// wrappedInternalExecutor allows errors to be injected in SQL execution.
type wrappedInternalExecutor struct {
wrapped sqlutil.InternalExecutor
Expand Down

0 comments on commit ba597f7

Please sign in to comment.