Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
100731: c2c: clean up ReplicationFeed error handling r=lidorcarmel a=msbutler

Previously, the replicationFeed test helper had methods that would swallow errors, making it impossible to debug certain test failures. This patch cleans up the internals of this test helper and prevents error swallowing.

Fixes #100414

Release note: None

101860: util/parquet: add support for arrays r=miretskiy a=jayshrivastava

This change extends and refactors the util/parquet library to be able to read and write arrays.

Release note: None

Informs: #99028
Epic: https://cockroachlabs.atlassian.net/browse/CRDB-15071

101936: kv: deflake and unskip TestStoreResolveMetrics r=arulajmani a=nvanbenschoten

Fixes #98404.

The test had begun flaking after #98044 because we now perform more async intent resolution operations when starting a cluster. Specifically, we perform additional async intent resolution operations in service of jobs updates. These updates perform SELECT FOR UPDATE queries over the new `system.job_info` table, but then perform a 1-phase commit.

To deflake the test, we clear the intent resolution metrics after server startup.

Release note: None

Co-authored-by: Michael Butler <[email protected]>
Co-authored-by: Jayant Shrivastava <[email protected]>
Co-authored-by: Nathan VanBenschoten <[email protected]>
  • Loading branch information
4 people committed Apr 20, 2023
4 parents fe19f93 + 8571abc + 42b37b2 + c286cd1 commit ccc9d02
Show file tree
Hide file tree
Showing 9 changed files with 412 additions and 179 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/streamingccl/replicationtestutils/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ go_library(
"//pkg/testutils/serverutils",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util/contextutil",
"//pkg/util/hlc",
"//pkg/util/protoutil",
"//pkg/util/syncutil",
"@com_github_cockroachdb_apd_v3//:apd",
"@com_github_cockroachdb_errors//:errors",
"@com_github_stretchr_testify//require",
Expand Down
86 changes: 34 additions & 52 deletions pkg/ccl/streamingccl/replicationtestutils/replication_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -104,27 +104,27 @@ func MakeReplicationFeed(t *testing.T, f FeedSource) *ReplicationFeed {
// Note: we don't do any buffering here. Therefore, it is required that the key
// we want to observe will arrive at some point in the future.
func (rf *ReplicationFeed) ObserveKey(ctx context.Context, key roachpb.Key) roachpb.KeyValue {
require.NoError(rf.t, rf.consumeUntil(ctx, KeyMatches(key), func(err error) bool {
return true
}))
rf.consumeUntil(ctx, KeyMatches(key), func(err error) bool {
return false
})
return *rf.msg.GetKV()
}

// ObserveResolved consumes the feed until we received resolved timestamp that's at least
// as high as the specified low watermark. Returns observed resolved timestamp.
func (rf *ReplicationFeed) ObserveResolved(ctx context.Context, lo hlc.Timestamp) hlc.Timestamp {
require.NoError(rf.t, rf.consumeUntil(ctx, ResolvedAtLeast(lo), func(err error) bool {
return true
}))
rf.consumeUntil(ctx, ResolvedAtLeast(lo), func(err error) bool {
return false
})
return minResolvedTimestamp(rf.msg.GetResolvedSpans())
}

// ObserveError consumes the feed until the feed is exhausted, and the final error should
// match 'errPred'.
func (rf *ReplicationFeed) ObserveError(ctx context.Context, errPred FeedErrorPredicate) {
require.NoError(rf.t, rf.consumeUntil(ctx, func(message streamingccl.Event) bool {
rf.consumeUntil(ctx, func(message streamingccl.Event) bool {
return false
}, errPred))
}, errPred)
}

// Close cleans up any resources.
Expand All @@ -134,51 +134,33 @@ func (rf *ReplicationFeed) Close(ctx context.Context) {

func (rf *ReplicationFeed) consumeUntil(
ctx context.Context, pred FeedEventPredicate, errPred FeedErrorPredicate,
) error {
const maxWait = 2 * time.Minute
doneCh := make(chan struct{})
mu := struct {
syncutil.Mutex
err error
}{}
defer close(doneCh)
go func() {
select {
case <-time.After(maxWait):
mu.Lock()
mu.err = errors.New("test timed out")
mu.Unlock()
rf.f.Close(ctx)
case <-doneCh:
}
}()

rowCount := 0
for {
msg, haveMoreRows := rf.f.Next()
if !haveMoreRows {
// We have run out of rows, let's try and make a nice error
// message.
mu.Lock()
err := mu.err
mu.Unlock()
if rf.f.Error() != nil {
require.True(rf.t, errPred(rf.f.Error()))
return nil
} else if err != nil {
rf.t.Fatal(err)
} else {
rf.t.Fatalf("ran out of rows after processing %d rows", rowCount)
) {
require.NoError(rf.t, contextutil.RunWithTimeout(ctx, "consume", 2*time.Minute,
func(ctx context.Context) error {
rowCount := 0
for {
msg, haveMoreRows := rf.f.Next()
if !haveMoreRows {
if rf.f.Error() != nil {
if errPred(rf.f.Error()) {
return nil
}
return rf.f.Error()
}
return errors.Newf("ran out of rows after processing %d rows", rowCount)
}
rowCount++
if msg == nil {
return errors.New("consumed empty msg")
}
if pred(msg) {
rf.msg = msg
return nil
}
}
}
rowCount++
}),
)

require.NotNil(rf.t, msg)
if pred(msg) {
rf.msg = msg
return nil
}
}
}

// TenantState maintains test state related to tenant.
Expand Down
9 changes: 6 additions & 3 deletions pkg/kv/kvserver/client_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand Down Expand Up @@ -176,8 +175,6 @@ func TestStoreResolveMetrics(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.WithIssue(t, 98404)

// First prevent rot that would result from adding fields without handling
// them everywhere.
{
Expand All @@ -199,6 +196,12 @@ func TestStoreResolveMetrics(t *testing.T) {
require.NoError(t, err)
span := roachpb.Span{Key: key, EndKey: key.Next()}

// Clear the metrics before starting the test so that we don't count intent
// resolutions from server startup.
store.Metrics().ResolveCommitCount.Clear()
store.Metrics().ResolveAbortCount.Clear()
store.Metrics().ResolvePoisonCount.Clear()

txn := roachpb.MakeTransaction("foo", span.Key, isolation.Serializable, roachpb.MinUserPriority, hlc.Timestamp{WallTime: 123}, 999, int32(s.NodeID()))

const resolveCommitCount = int64(200)
Expand Down
84 changes: 58 additions & 26 deletions pkg/util/parquet/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ import (
"github.com/lib/pq/oid"
)

// Setting parquet.Repetitions.Optional makes parquet a column nullable. When
// writing a datum, we will always specify a definition level to indicate if the
// datum is null or not. See comments on nonNilDefLevel or nilDefLevel for more info.
var defaultRepetitions = parquet.Repetitions.Optional

// A schema field is an internal identifier for schema nodes used by the parquet library.
// A value of -1 will let the library auto-assign values. This does not affect reading
// or writing parquet files.
Expand All @@ -36,7 +41,7 @@ const defaultTypeLength = -1
// A column stores column metadata.
type column struct {
node schema.Node
colWriter writeFn
colWriter colWriter
decoder decoder
typ *types.T
}
Expand Down Expand Up @@ -67,7 +72,7 @@ func NewSchema(columnNames []string, columnTypes []*types.T) (*SchemaDefinition,
fields := make([]schema.Node, 0)

for i := 0; i < len(columnNames); i++ {
parquetCol, err := makeColumn(columnNames[i], columnTypes[i])
parquetCol, err := makeColumn(columnNames[i], columnTypes[i], defaultRepetitions)
if err != nil {
return nil, err
}
Expand All @@ -87,50 +92,44 @@ func NewSchema(columnNames []string, columnTypes []*types.T) (*SchemaDefinition,
}

// makeColumn constructs a column.
func makeColumn(colName string, typ *types.T) (column, error) {
// Setting parquet.Repetitions.Optional makes parquet interpret all columns as nullable.
// When writing data, we will specify a definition level of 0 (null) or 1 (not null).
// See https://blog.twitter.com/engineering/en_us/a/2013/dremel-made-simple-with-parquet
// for more information regarding definition levels.
defaultRepetitions := parquet.Repetitions.Optional

func makeColumn(colName string, typ *types.T, repetitions parquet.Repetition) (column, error) {
result := column{typ: typ}
var err error
switch typ.Family() {
case types.BoolFamily:
result.node = schema.NewBooleanNode(colName, defaultRepetitions, defaultSchemaFieldID)
result.colWriter = writeBool
result.node = schema.NewBooleanNode(colName, repetitions, defaultSchemaFieldID)
result.colWriter = scalarWriter(writeBool)
result.decoder = boolDecoder{}
result.typ = types.Bool
return result, nil
case types.StringFamily:
result.node, err = schema.NewPrimitiveNodeLogical(colName,
defaultRepetitions, schema.StringLogicalType{}, parquet.Types.ByteArray,
repetitions, schema.StringLogicalType{}, parquet.Types.ByteArray,
defaultTypeLength, defaultSchemaFieldID)

if err != nil {
return result, err
}
result.colWriter = writeString
result.colWriter = scalarWriter(writeString)
result.decoder = stringDecoder{}
return result, nil
case types.IntFamily:
// Note: integer datums are always signed: https://www.cockroachlabs.com/docs/stable/int.html
if typ.Oid() == oid.T_int8 {
result.node, err = schema.NewPrimitiveNodeLogical(colName,
defaultRepetitions, schema.NewIntLogicalType(64, true),
repetitions, schema.NewIntLogicalType(64, true),
parquet.Types.Int64, defaultTypeLength,
defaultSchemaFieldID)
if err != nil {
return result, err
}
result.colWriter = writeInt64
result.colWriter = scalarWriter(writeInt64)
result.decoder = int64Decoder{}
return result, nil
}

result.node = schema.NewInt32Node(colName, defaultRepetitions, defaultSchemaFieldID)
result.colWriter = writeInt32
result.node = schema.NewInt32Node(colName, repetitions, defaultSchemaFieldID)
result.colWriter = scalarWriter(writeInt32)
result.decoder = int32Decoder{}
return result, nil
case types.DecimalFamily:
Expand All @@ -149,37 +148,71 @@ func makeColumn(colName string, typ *types.T) (column, error) {
}

result.node, err = schema.NewPrimitiveNodeLogical(colName,
defaultRepetitions, schema.NewDecimalLogicalType(precision,
repetitions, schema.NewDecimalLogicalType(precision,
scale), parquet.Types.ByteArray, defaultTypeLength,
defaultSchemaFieldID)
if err != nil {
return result, err
}
result.colWriter = writeDecimal
result.colWriter = scalarWriter(writeDecimal)
result.decoder = decimalDecoder{}
return result, nil
case types.UuidFamily:
result.node, err = schema.NewPrimitiveNodeLogical(colName,
defaultRepetitions, schema.UUIDLogicalType{},
repetitions, schema.UUIDLogicalType{},
parquet.Types.FixedLenByteArray, uuid.Size, defaultSchemaFieldID)
if err != nil {
return result, err
}
result.colWriter = writeUUID
result.colWriter = scalarWriter(writeUUID)
result.decoder = uUIDDecoder{}
return result, nil
case types.TimestampFamily:
// Note that all timestamp datums are in UTC: https://www.cockroachlabs.com/docs/stable/timestamp.html
result.node, err = schema.NewPrimitiveNodeLogical(colName,
defaultRepetitions, schema.StringLogicalType{}, parquet.Types.ByteArray,
repetitions, schema.StringLogicalType{}, parquet.Types.ByteArray,
defaultTypeLength, defaultSchemaFieldID)
if err != nil {
return result, err
}

result.colWriter = writeTimestamp
result.colWriter = scalarWriter(writeTimestamp)
result.decoder = timestampDecoder{}
return result, nil
case types.ArrayFamily:
// Arrays for type T are represented by the following:
// message schema { -- toplevel schema
// optional group a (LIST) { -- list column
// repeated group list {
// optional T element;
// }
// }
// }
// Representing arrays this way makes it easier to differentiate NULL, [NULL],
// and [] when encoding.
// There is more info about encoding arrays here:
// https://arrow.apache.org/blog/2022/10/08/arrow-parquet-encoding-part-2/
elementCol, err := makeColumn("element", typ.ArrayContents(), parquet.Repetitions.Optional)
if err != nil {
return result, err
}
innerListFields := []schema.Node{elementCol.node}
innerListNode, err := schema.NewGroupNode("list", parquet.Repetitions.Repeated, innerListFields, defaultSchemaFieldID)
if err != nil {
return result, err
}
outerListFields := []schema.Node{innerListNode}
result.node, err = schema.NewGroupNodeLogical(colName, parquet.Repetitions.Optional, outerListFields, schema.ListLogicalType{}, defaultSchemaFieldID)
if err != nil {
return result, err
}
result.decoder = elementCol.decoder
scalarColWriter, ok := elementCol.colWriter.(scalarWriter)
if !ok {
return result, errors.AssertionFailedf("expected scalar column writer")
}
result.colWriter = arrayWriter(scalarColWriter)
result.typ = elementCol.typ
return result, nil

// TODO(#99028): implement support for the remaining types.
// case types.INetFamily:
Expand All @@ -196,8 +229,7 @@ func makeColumn(colName string, typ *types.T) (column, error) {
// case types.TimeTZFamily:
// case types.IntervalFamily:
// case types.TimestampTZFamily:
// case types.ArrayFamily:
default:
return result, pgerror.Newf(pgcode.FeatureNotSupported, "parquet export does not support the %v type yet", typ.Family())
return result, pgerror.Newf(pgcode.FeatureNotSupported, "parquet export does not support the %v type", typ.Family())
}
}
Loading

0 comments on commit ccc9d02

Please sign in to comment.