Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
100813: streamingccl: deflake TestRandomClientGeneration r=adityamaru a=msbutler

This patch fixes 4 bugs in TestRandomClientGeneration that were responsible for the persistent flakiness and lack of coverage in this test:
- the randomeStreamClient no longer instantiates keys with a table prefix that collides with the job info table prefix. This collision was the original cause of the flakes reported in #99343.
- getPartitionSpanToTableId() now generates a correct map from source partition key space to table Id. Previously, the key spans in the map didn't contain keys that mapped to anything logical in the cockroach key space.
- assertKVs() now checks for keys in the destination tenant keyspace.
- assertKVs() now actually asserts that kvs were found. Before, the assertion could pass if no keys were actually checked, which has been happening for months and allowed the bugs above to infest this test.

Fixes #99343

Release note: None

100952: cli: trash `TestNoLinkForbidden` r=rail a=rickystewart

This test does not work:
1. The test [has been broken](#74119) for years.
2. The test is not sensible in the Bazel world anyway, and under remote execution the test fails with an error like the following:

```
 build.go:59: go/build: go list github.com/cockroachdb/cockroach/pkg/cmd/cockroach: fork/exec GOROOT/bin/go: no such file or directory
```

The bug to replace this test with working functionality based on Bazel is #81526.

Epic: CRDB-17165
Release note: None
Closes #74119.

100965: sql: link issue to unimplemented mutations in udfs r=mgartner,rytaft a=rharding6373

Links an issue to the unimplemented errors for mutations in UDFs.

Epic: None
Informs: #87289
Fixes: #99715

Release note: None

Co-authored-by: Michael Butler <[email protected]>
Co-authored-by: Ricky Stewart <[email protected]>
Co-authored-by: rharding6373 <[email protected]>
  • Loading branch information
4 people committed Apr 10, 2023
4 parents a73b33f + 3539505 + 3522c98 + 0d66c6c commit 9ce0ac2
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 56 deletions.
3 changes: 2 additions & 1 deletion pkg/ccl/streamingccl/streamclient/random_stream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ const (
// TODO(dt): just make interceptors a singleton, not the whole client.
var randomStreamClientSingleton = func() *RandomStreamClient {
c := RandomStreamClient{}
c.mu.tableID = 52
// Make the base tableID really large to prevent colliding with system table IDs.
c.mu.tableID = 5000
return &c
}()

Expand Down
1 change: 0 additions & 1 deletion pkg/ccl/streamingccl/streamingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@ go_test(
"//pkg/testutils/datapathutils",
"//pkg/testutils/distsqlutils",
"//pkg/testutils/jobutils",
"//pkg/testutils/keysutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
"//pkg/testutils/sqlutils",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand All @@ -37,7 +38,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/distsqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/keysutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/storageutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -335,48 +336,56 @@ func TestStreamIngestionProcessor(t *testing.T) {
})
}

// getPartitionSpanToTableID maps a partiton's span to the tableID it covers in
// the source keyspace. It assumes the source used a random_stream_client, which generates keys for
// a single table span per partition.
func getPartitionSpanToTableID(
t *testing.T, partitions []streamclient.PartitionInfo,
) map[string]int {
pSpanToTableID := make(map[string]int)

// Aggregate the table IDs which should have been ingested.
for _, pa := range partitions {
pKey := roachpb.Key(pa.ID)
pSpan := roachpb.Span{Key: pKey, EndKey: pKey.Next()}
require.Equal(t, 1, len(pa.Spans), "unexpected number of spans in the partition")
pSpan := pa.Spans[0]
paURL, err := url.Parse(string(pa.SubscriptionToken))
require.NoError(t, err)
id, err := strconv.Atoi(paURL.Host)
require.NoError(t, err)
pSpanToTableID[pSpan.String()] = id
t.Logf("Partition Span %s; Partition ID %d", pSpan.String(), id)
}
return pSpanToTableID
}

// assertEqualKVs iterates over the store in `tc` and compares the MVCC KVs
// against the in-memory copy of events stored in the `streamValidator`. This
// ensures that the stream ingestion processor ingested at least as much data as
// was streamed up until partitionTimestamp.
// was streamed up until partitionTimestamp. The function returns true if it
// validated at least one streamed kv.
func assertEqualKVs(
t *testing.T,
tc *testcluster.TestCluster,
srv serverutils.TestServerInterface,
streamValidator *streamClientValidator,
tableID int,
targetSpan roachpb.Span,
partitionTimestamp hlc.Timestamp,
) {
key := keysutils.TestingSQLCodec.TablePrefix(uint32(tableID))

) (foundKVs bool) {
t.Logf("target span %s; partition ts %s", targetSpan, partitionTimestamp)
if partitionTimestamp.WallTime == 0 {
// Implies this span never got a checkpoint
return foundKVs
}
// Iterate over the store.
store := tc.GetFirstStoreFromServer(t, 0)
store, err := srv.GetStores().(*kvserver.Stores).GetStore(srv.GetFirstStoreID())
require.NoError(t, err)
it := store.TODOEngine().NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{
LowerBound: key,
UpperBound: key.PrefixEnd(),
LowerBound: targetSpan.Key,
UpperBound: targetSpan.EndKey,
})
defer it.Close()
var prevKey roachpb.Key
var valueTimestampTuples []roachpb.KeyValue
var err error
for it.SeekGE(storage.MVCCKey{Key: key}); ; it.Next() {
for it.SeekGE(storage.MVCCKey{Key: targetSpan.Key}); ; it.Next() {
if ok, err := it.Valid(); !ok {
if err != nil {
t.Fatal(err)
Expand All @@ -389,20 +398,20 @@ func assertEqualKVs(
if partitionTimestamp.Less(it.UnsafeKey().Timestamp) {
continue
}

foundKVs = true
newKey := (prevKey != nil && !it.UnsafeKey().Key.Equal(prevKey)) || prevKey == nil
prevKey = it.UnsafeKey().Clone().Key

descriptiveErrorMsg := fmt.Sprintf("Key %s; Ts %s: Is new Key %t; Partition Ts %s", it.UnsafeKey().Key, it.UnsafeKey().Timestamp, newKey, partitionTimestamp)
if newKey {
// All value ts should have been drained at this point, otherwise there is
// a mismatch between the streamed and ingested data.
require.Equal(t, 0, len(valueTimestampTuples))
valueTimestampTuples, err = streamValidator.getValuesForKeyBelowTimestamp(
string(it.UnsafeKey().Key), partitionTimestamp)
require.NoError(t, err)
require.NoError(t, err, descriptiveErrorMsg)
}

require.Greater(t, len(valueTimestampTuples), 0)
// Implies there exists a key in the store that was not logged by the stream validator.
require.Greater(t, len(valueTimestampTuples), 0, descriptiveErrorMsg)
// Since the iterator goes from latest to older versions, we compare
// starting from the end of the slice that is sorted by timestamp.
latestVersionInChain := valueTimestampTuples[len(valueTimestampTuples)-1]
Expand All @@ -419,6 +428,7 @@ func assertEqualKVs(
// for the next iteration.
valueTimestampTuples = valueTimestampTuples[0 : len(valueTimestampTuples)-1]
}
return foundKVs
}

func makeTestStreamURI(
Expand Down Expand Up @@ -448,11 +458,9 @@ func TestRandomClientGeneration(t *testing.T) {
defer log.Scope(t).Close(t)

ctx := context.Background()

tc := testcluster.StartTestCluster(t, 3 /* nodes */, base.TestClusterArgs{})
defer tc.Stopper().Stop(ctx)
registry := tc.Server(0).JobRegistry().(*jobs.Registry)
db := tc.Server(0).InternalDB().(descs.DB)
srv, _, _ := serverutils.StartServer(t, base.TestServerArgs{})
defer srv.Stopper().Stop(ctx)
registry := srv.JobRegistry().(*jobs.Registry)

// TODO: Consider testing variations on these parameters.
tenantID := roachpb.MustMakeTenantID(20)
Expand All @@ -470,7 +478,6 @@ func TestRandomClientGeneration(t *testing.T) {

topo, err := randomStreamClient.Plan(ctx, rps.StreamID)
require.NoError(t, err)
// One system and two table data partitions.
require.Equal(t, 2 /* numPartitions */, len(topo.Partitions))

initialScanTimestamp := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()}
Expand All @@ -479,6 +486,7 @@ func TestRandomClientGeneration(t *testing.T) {
// Cancel the flow after emitting 1000 checkpoint events from the client.
mu := syncutil.Mutex{}
cancelAfterCheckpoints := makeCheckpointEventCounter(&mu, 1000, cancel)

tenantRekey := execinfrapb.TenantRekey{
OldID: tenantID,
NewID: roachpb.MustMakeTenantID(tenantID.ToUint64() + 10),
Expand All @@ -495,12 +503,11 @@ func TestRandomClientGeneration(t *testing.T) {
randomStreamClient.RegisterInterception(cancelAfterCheckpoints)
randomStreamClient.RegisterInterception(validateFnWithValidator(t, streamValidator))

out, err := runStreamIngestionProcessor(ctx, t, registry, db,
out, err := runStreamIngestionProcessor(ctx, t, registry, srv.InternalDB().(descs.DB),
topo, initialScanTimestamp, []jobspb.ResolvedSpan{}, tenantRekey,
randomStreamClient, noCutover{}, nil /* streamingTestingKnobs*/)
require.NoError(t, err)

partitionSpanToTableID := getPartitionSpanToTableID(t, topo.Partitions)
numResolvedEvents := 0
maxResolvedTimestampPerPartition := make(map[string]hlc.Timestamp)
for {
Expand Down Expand Up @@ -529,9 +536,9 @@ func TestRandomClientGeneration(t *testing.T) {
latestResolvedTimestamp = resolvedSpan.Timestamp
}

// Track the max resolved timestamp per partition.
if ts, ok := maxResolvedTimestampPerPartition[resolvedSpan.Span.String()]; !ok ||
ts.Less(resolvedSpan.Timestamp) {
// Track the max resolved timestamp per partition. Note that resolved
// spans are mapped to the source tenant keyspace.
if maxResolvedTimestampPerPartition[resolvedSpan.Span.String()].Less(resolvedSpan.Timestamp) {
maxResolvedTimestampPerPartition[resolvedSpan.Span.String()] = resolvedSpan.Timestamp
}
numResolvedEvents++
Expand All @@ -545,13 +552,28 @@ func TestRandomClientGeneration(t *testing.T) {
for _, failure := range streamValidator.failures() {
t.Error(failure)
}

for pSpan, id := range partitionSpanToTableID {
foundKVs := false
ingestionCodec := keys.MakeSQLCodec(tenantRekey.NewID)
for pSpan, id := range getPartitionSpanToTableID(t, topo.Partitions) {
// Scan the store for KVs ingested by this partition, and compare the MVCC
// KVs against the KVEvents streamed up to the max ingested timestamp for
// the partition.
assertEqualKVs(t, tc, streamValidator, id, maxResolvedTimestampPerPartition[pSpan])
//
// Note that target span must be rekeyed to the destination
// tenant key space.
startKey := ingestionCodec.TablePrefix(uint32(id))
targetSpan := roachpb.Span{Key: startKey, EndKey: startKey.PrefixEnd()}
if assertEqualKVs(t, srv, streamValidator, targetSpan,
maxResolvedTimestampPerPartition[pSpan]) {
foundKVs = true
}
}
// Note: we only assert that KVs were found over all partitions instead of in
// each partition because it is possible for a partition to not send any
// checkpoint events. This stream ingestion processor only terminates once a
// total number of checkpoints have been reached and makes no guarantees that
// each partition gets a checkpoint.
require.True(t, foundKVs, "expected to find and assert equal kvs")
require.Greater(t, numResolvedEvents, 0, "at least 1 resolved event expected")
}

Expand Down
1 change: 0 additions & 1 deletion pkg/cli/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,6 @@ go_test(
"//pkg/sql/tests",
"//pkg/storage",
"//pkg/testutils",
"//pkg/testutils/buildutil",
"//pkg/testutils/datapathutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
Expand Down
20 changes: 0 additions & 20 deletions pkg/cli/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/security/securitytest"
"github.com/cockroachdb/cockroach/pkg/server/status"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/buildutil"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand All @@ -57,25 +56,6 @@ func TestStdFlagToPflag(t *testing.T) {
})
}

func TestNoLinkForbidden(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// Verify that the cockroach binary doesn't depend on certain packages.
buildutil.VerifyNoImports(t,
"github.com/cockroachdb/cockroach/pkg/cmd/cockroach", true,
[]string{
"testing", // defines flags
"go/build", // probably not something we want in the main binary
},
[]string{},
// The errors library uses go/build to determine
// the list of source directories (used to strip the source prefix
// in stack trace reports).
"github.com/cockroachdb/errors/withstack",
)
}

func TestCacheFlagValue(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
6 changes: 6 additions & 0 deletions pkg/sql/opt/optbuilder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,12 @@ func (b *Builder) buildStmt(
switch stmt := stmt.(type) {
case *tree.Select:
case tree.SelectStatement:
case *tree.Delete:
panic(unimplemented.NewWithIssuef(87289, "%s usage inside a function definition", stmt.StatementTag()))
case *tree.Insert:
panic(unimplemented.NewWithIssuef(87289, "%s usage inside a function definition", stmt.StatementTag()))
case *tree.Update:
panic(unimplemented.NewWithIssuef(87289, "%s usage inside a function definition", stmt.StatementTag()))
default:
panic(unimplemented.Newf("user-defined functions", "%s usage inside a function definition", stmt.StatementTag()))
}
Expand Down

0 comments on commit 9ce0ac2

Please sign in to comment.