Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
103131: util/parquet: add crdb metadata to parquet files r=miretskiy a=jayshrivastava

util/parquet: add option to write kv metadata to files

This change adds an option to the writer which allows the caller
to write arbitrary kv metadata to parquet files. This is useful
for testing purposes.

Informs: cockroachdb#99028
Epic: https://cockroachlabs.atlassian.net/browse/CRDB-15071
Release note: None

---

util/parquet: remove dependency on writer to read parquet files

Previously, the test utils used to read parquet files
would require the writer as an argument. The main reason
the writer was required is that the writer contained
crdb-specific type information which could be used to
decode raw data until crdb datums.

With this change, the writer is updated to write this
crdb-specific type information to the parquet file in
its metadata. The reader is updated to the read type
information from the file metadata. There is a new
test utility function `ReadFile(parquetFile string)`
which can be used to read all datums from a parquet
file without providing any additional type information.
The function also returns metadata since it is possible
for users of the `Writer` to write arbitrary metadata
and such users may need this metadata in testing.

Informs: cockroachdb#99028
Epic: https://cockroachlabs.atlassian.net/browse/CRDB-15071
Release note: None

103181: server: Default job wait time to 10s r=miretskiy a=miretskiy

This is a revert of cockroachdb#103134 to reset job wait period to 10s Epic: None

Release note: None

Co-authored-by: Jayant Shrivastava <[email protected]>
Co-authored-by: Yevgeniy Miretskiy <[email protected]>
  • Loading branch information
3 people committed May 15, 2023
3 parents beea50d + 7a36a77 + feac253 commit 1ceb218
Show file tree
Hide file tree
Showing 13 changed files with 368 additions and 136 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ server.oidc_authentication.scopes string openid sets OIDC scopes to include with
server.rangelog.ttl duration 720h0m0s if nonzero, entries in system.rangelog older than this duration are periodically purged tenant-rw
server.shutdown.connection_wait duration 0s the maximum amount of time a server waits for all SQL connections to be closed before proceeding with a drain. (note that the --drain-wait parameter for cockroach node drain may need adjustment after changing this setting) tenant-rw
server.shutdown.drain_wait duration 0s the amount of time a server waits in an unready state before proceeding with a drain (note that the --drain-wait parameter for cockroach node drain may need adjustment after changing this setting. --drain-wait is to specify the duration of the whole draining process, while server.shutdown.drain_wait is to set the wait time for health probes to notice that the node is not ready.) tenant-rw
server.shutdown.jobs_wait duration 0s the maximum amount of time a server waits for all currently executing jobs to notice drain request and to perform orderly shutdown tenant-rw
server.shutdown.jobs_wait duration 10s the maximum amount of time a server waits for all currently executing jobs to notice drain request and to perform orderly shutdown tenant-rw
server.shutdown.query_wait duration 10s the timeout for waiting for active queries to finish during a drain (note that the --drain-wait parameter for cockroach node drain may need adjustment after changing this setting) tenant-rw
server.time_until_store_dead duration 5m0s the time after which if there is no new gossiped information about a store, it is considered dead tenant-rw
server.user_login.cert_password_method.auto_scram_promotion.enabled boolean true whether to automatically promote cert-password authentication to use SCRAM tenant-rw
Expand Down
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@
<tr><td><div id="setting-server-secondary-tenants-redact-trace-enabled" class="anchored"><code>server.secondary_tenants.redact_trace.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>controls if server side traces are redacted for tenant operations</td><td>Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-server-shutdown-connection-wait" class="anchored"><code>server.shutdown.connection_wait</code></div></td><td>duration</td><td><code>0s</code></td><td>the maximum amount of time a server waits for all SQL connections to be closed before proceeding with a drain. (note that the --drain-wait parameter for cockroach node drain may need adjustment after changing this setting)</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-server-shutdown-drain-wait" class="anchored"><code>server.shutdown.drain_wait</code></div></td><td>duration</td><td><code>0s</code></td><td>the amount of time a server waits in an unready state before proceeding with a drain (note that the --drain-wait parameter for cockroach node drain may need adjustment after changing this setting. --drain-wait is to specify the duration of the whole draining process, while server.shutdown.drain_wait is to set the wait time for health probes to notice that the node is not ready.)</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-server-shutdown-jobs-wait" class="anchored"><code>server.shutdown.jobs_wait</code></div></td><td>duration</td><td><code>0s</code></td><td>the maximum amount of time a server waits for all currently executing jobs to notice drain request and to perform orderly shutdown</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-server-shutdown-jobs-wait" class="anchored"><code>server.shutdown.jobs_wait</code></div></td><td>duration</td><td><code>10s</code></td><td>the maximum amount of time a server waits for all currently executing jobs to notice drain request and to perform orderly shutdown</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-server-shutdown-lease-transfer-wait" class="anchored"><code>server.shutdown.lease_transfer_wait</code></div></td><td>duration</td><td><code>5s</code></td><td>the timeout for a single iteration of the range lease transfer phase of draining (note that the --drain-wait parameter for cockroach node drain may need adjustment after changing this setting)</td><td>Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-server-shutdown-query-wait" class="anchored"><code>server.shutdown.query_wait</code></div></td><td>duration</td><td><code>10s</code></td><td>the timeout for waiting for active queries to finish during a drain (note that the --drain-wait parameter for cockroach node drain may need adjustment after changing this setting)</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-server-time-until-store-dead" class="anchored"><code>server.time_until_store_dead</code></div></td><td>duration</td><td><code>5m0s</code></td><td>the time after which if there is no new gossiped information about a store, it is considered dead</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
Expand Down
3 changes: 1 addition & 2 deletions pkg/ccl/changefeedccl/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1278,8 +1278,7 @@ func waitForJobStatus(
}

// TestingSetIncludeParquetMetadata adds the option to turn on adding metadata
// (primary key column names) to the parquet file which is used to convert parquet
// data to JSON format
// to the parquet file which is used in testing.
func TestingSetIncludeParquetMetadata() func() {
includeParquetTestMetadata = true
return func() {
Expand Down
9 changes: 7 additions & 2 deletions pkg/ccl/changefeedccl/parquet.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type parquetWriter struct {
// newParquetWriterFromRow constructs a new parquet writer which outputs to
// the given sink. This function interprets the schema from the supplied row.
func newParquetWriterFromRow(
row cdcevent.Row, sink io.Writer, maxRowGroupSize int64,
row cdcevent.Row, sink io.Writer, opts ...parquet.Option,
) (*parquetWriter, error) {
columnNames := make([]string, len(row.ResultColumns())+1)
columnTypes := make([]*types.T, len(row.ResultColumns())+1)
Expand All @@ -48,7 +48,12 @@ func newParquetWriterFromRow(
return nil, err
}

writer, err := parquet.NewWriter(schemaDef, sink, parquet.WithMaxRowGroupLength(maxRowGroupSize))
writerConstructor := parquet.NewWriter
if includeParquetTestMetadata {
writerConstructor = parquet.NewWriterWithReaderMeta
}

writer, err := writerConstructor(schemaDef, sink, opts...)
if err != nil {
return nil, err
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/ccl/changefeedccl/parquet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ func TestParquetRows(t *testing.T) {
// Rangefeed reader can time out under stress.
skip.UnderStress(t)

defer TestingSetIncludeParquetMetadata()()

ctx := context.Background()
s, db, _ := serverutils.StartServer(t, base.TestServerArgs{
// TODO(#98816): cdctest.GetHydratedTableDescriptor does not work with tenant dbs.
Expand Down Expand Up @@ -110,7 +112,7 @@ func TestParquetRows(t *testing.T) {
require.NoError(t, err)

if writer == nil {
writer, err = newParquetWriterFromRow(updatedRow, f, maxRowGroupSize)
writer, err = newParquetWriterFromRow(updatedRow, f, parquet.WithMaxRowGroupLength(maxRowGroupSize))
if err != nil {
t.Fatalf(err.Error())
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/ccl/serverccl/server_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,12 @@ func TestServerStartStop(t *testing.T) {
if err := db2.Ping(); err != nil {
return err
}

// Don't wait for graceful jobs shutdown in this test since
// we want to make sure test completes reasonably quickly.
_, err = db2.Exec("SET CLUSTER SETTING server.shutdown.jobs_wait='0s'")
require.NoError(t, err)

return nil
})

Expand Down
2 changes: 1 addition & 1 deletion pkg/server/drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ var (
"server.shutdown.jobs_wait",
"the maximum amount of time a server waits for all currently executing jobs "+
"to notice drain request and to perform orderly shutdown",
0*time.Second,
10*time.Second,
settings.NonNegativeDurationWithMaximum(10*time.Hour),
).WithPublic()
)
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/parquet/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ go_library(
"@com_github_apache_arrow_go_v11//parquet",
"@com_github_apache_arrow_go_v11//parquet/compress",
"@com_github_apache_arrow_go_v11//parquet/file",
"@com_github_apache_arrow_go_v11//parquet/metadata",
"@com_github_apache_arrow_go_v11//parquet/schema",
"@com_github_cockroachdb_errors//:errors",
"@com_github_lib_pq//oid",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
],
)
Expand Down
72 changes: 71 additions & 1 deletion pkg/util/parquet/decoders.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/geo"
"github.com/cockroachdb/cockroach/pkg/geo/geopb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/bitarray"
"github.com/cockroachdb/cockroach/pkg/util/duration"
"github.com/cockroachdb/cockroach/pkg/util/timeofday"
Expand Down Expand Up @@ -236,6 +237,76 @@ func (collatedStringDecoder) decode(v parquet.ByteArray) (tree.Datum, error) {
return &tree.DCollatedString{Contents: string(v)}, nil
}

// decoderFromFamilyAndType returns the decoder to use based on the type oid and
// family. Note the logical similarity to makeColumn in schema.go. This is
// intentional as each decoder returned by this function corresponds to a
// particular colWriter determined by makeColumn.
// TODO: refactor to remove the code duplication with makeColumn
func decoderFromFamilyAndType(typOid oid.Oid, family types.Family) (decoder, error) {
switch family {
case types.BoolFamily:
return boolDecoder{}, nil
case types.StringFamily:
return stringDecoder{}, nil
case types.IntFamily:
typ, ok := types.OidToType[typOid]
if !ok {
return nil, errors.AssertionFailedf("could not determine type from oid %d", typOid)
}
if typ.Oid() == oid.T_int8 {
return int64Decoder{}, nil
}
return int32Decoder{}, nil
case types.DecimalFamily:
return decimalDecoder{}, nil
case types.TimestampFamily:
return timestampDecoder{}, nil
case types.TimestampTZFamily:
return timestampTZDecoder{}, nil
case types.UuidFamily:
return uUIDDecoder{}, nil
case types.INetFamily:
return iNetDecoder{}, nil
case types.JsonFamily:
return jsonDecoder{}, nil
case types.BitFamily:
return bitDecoder{}, nil
case types.BytesFamily:
return bytesDecoder{}, nil
case types.EnumFamily:
return enumDecoder{}, nil
case types.DateFamily:
return dateDecoder{}, nil
case types.Box2DFamily:
return box2DDecoder{}, nil
case types.GeographyFamily:
return geographyDecoder{}, nil
case types.GeometryFamily:
return geometryDecoder{}, nil
case types.IntervalFamily:
return intervalDecoder{}, nil
case types.TimeFamily:
return timeDecoder{}, nil
case types.TimeTZFamily:
return timeTZDecoder{}, nil
case types.FloatFamily:
typ, ok := types.OidToType[typOid]
if !ok {
return nil, errors.AssertionFailedf("could not determine type from oid %d", typOid)
}
if typ.Oid() == oid.T_float4 {
return float32Decoder{}, nil
}
return float64Decoder{}, nil
case types.OidFamily:
return oidDecoder{}, nil
case types.CollatedStringFamily:
return collatedStringDecoder{}, nil
default:
return nil, errors.AssertionFailedf("could not find decoder for type oid %d and family %d", typOid, family)
}
}

// Defeat the linter's unused lint errors.
func init() {
var _, _ = boolDecoder{}.decode(false)
Expand All @@ -253,7 +324,6 @@ func init() {
var _, _ = enumDecoder{}.decode(parquet.ByteArray{})
var _, _ = dateDecoder{}.decode(parquet.ByteArray{})
var _, _ = box2DDecoder{}.decode(parquet.ByteArray{})
var _, _ = box2DDecoder{}.decode(parquet.ByteArray{})
var _, _ = geographyDecoder{}.decode(parquet.ByteArray{})
var _, _ = geometryDecoder{}.decode(parquet.ByteArray{})
var _, _ = intervalDecoder{}.decode(parquet.ByteArray{})
Expand Down
26 changes: 0 additions & 26 deletions pkg/util/parquet/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ const defaultTypeLength = -1
type column struct {
node schema.Node
colWriter colWriter
decoder decoder
typ *types.T
}

Expand Down Expand Up @@ -99,7 +98,6 @@ func makeColumn(colName string, typ *types.T, repetitions parquet.Repetition) (c
case types.BoolFamily:
result.node = schema.NewBooleanNode(colName, repetitions, defaultSchemaFieldID)
result.colWriter = scalarWriter(writeBool)
result.decoder = boolDecoder{}
result.typ = types.Bool
return result, nil
case types.StringFamily:
Expand All @@ -111,7 +109,6 @@ func makeColumn(colName string, typ *types.T, repetitions parquet.Repetition) (c
return result, err
}
result.colWriter = scalarWriter(writeString)
result.decoder = stringDecoder{}
return result, nil
case types.IntFamily:
// Note: integer datums are always signed: https://www.cockroachlabs.com/docs/stable/int.html
Expand All @@ -124,13 +121,11 @@ func makeColumn(colName string, typ *types.T, repetitions parquet.Repetition) (c
return result, err
}
result.colWriter = scalarWriter(writeInt64)
result.decoder = int64Decoder{}
return result, nil
}

result.node = schema.NewInt32Node(colName, repetitions, defaultSchemaFieldID)
result.colWriter = scalarWriter(writeInt32)
result.decoder = int32Decoder{}
return result, nil
case types.DecimalFamily:
// According to PostgresSQL docs, scale or precision of 0 implies max
Expand All @@ -155,7 +150,6 @@ func makeColumn(colName string, typ *types.T, repetitions parquet.Repetition) (c
return result, err
}
result.colWriter = scalarWriter(writeDecimal)
result.decoder = decimalDecoder{}
return result, nil
case types.UuidFamily:
result.node, err = schema.NewPrimitiveNodeLogical(colName,
Expand All @@ -165,7 +159,6 @@ func makeColumn(colName string, typ *types.T, repetitions parquet.Repetition) (c
return result, err
}
result.colWriter = scalarWriter(writeUUID)
result.decoder = uUIDDecoder{}
return result, nil
case types.TimestampFamily:
// We do not use schema.TimestampLogicalType because the library will enforce
Expand All @@ -177,7 +170,6 @@ func makeColumn(colName string, typ *types.T, repetitions parquet.Repetition) (c
return result, err
}
result.colWriter = scalarWriter(writeTimestamp)
result.decoder = timestampDecoder{}
return result, nil
case types.TimestampTZFamily:
// We do not use schema.TimestampLogicalType because the library will enforce
Expand All @@ -189,7 +181,6 @@ func makeColumn(colName string, typ *types.T, repetitions parquet.Repetition) (c
return result, err
}
result.colWriter = scalarWriter(writeTimestampTZ)
result.decoder = timestampTZDecoder{}
return result, nil
case types.INetFamily:
result.node, err = schema.NewPrimitiveNodeLogical(colName,
Expand All @@ -199,7 +190,6 @@ func makeColumn(colName string, typ *types.T, repetitions parquet.Repetition) (c
return result, err
}
result.colWriter = scalarWriter(writeINet)
result.decoder = iNetDecoder{}
return result, nil
case types.JsonFamily:
result.node, err = schema.NewPrimitiveNodeLogical(colName,
Expand All @@ -209,7 +199,6 @@ func makeColumn(colName string, typ *types.T, repetitions parquet.Repetition) (c
return result, err
}
result.colWriter = scalarWriter(writeJSON)
result.decoder = jsonDecoder{}
return result, nil
case types.BitFamily:
result.node, err = schema.NewPrimitiveNode(colName,
Expand All @@ -219,7 +208,6 @@ func makeColumn(colName string, typ *types.T, repetitions parquet.Repetition) (c
return result, err
}
result.colWriter = scalarWriter(writeBit)
result.decoder = bitDecoder{}
return result, nil
case types.BytesFamily:
result.node, err = schema.NewPrimitiveNode(colName,
Expand All @@ -229,7 +217,6 @@ func makeColumn(colName string, typ *types.T, repetitions parquet.Repetition) (c
return result, err
}
result.colWriter = scalarWriter(writeBytes)
result.decoder = bytesDecoder{}
return result, nil
case types.EnumFamily:
result.node, err = schema.NewPrimitiveNodeLogical(colName,
Expand All @@ -239,7 +226,6 @@ func makeColumn(colName string, typ *types.T, repetitions parquet.Repetition) (c
return result, err
}
result.colWriter = scalarWriter(writeEnum)
result.decoder = enumDecoder{}
return result, nil
case types.DateFamily:
// We do not use schema.DateLogicalType because the library will enforce
Expand All @@ -251,7 +237,6 @@ func makeColumn(colName string, typ *types.T, repetitions parquet.Repetition) (c
return result, err
}
result.colWriter = scalarWriter(writeDate)
result.decoder = dateDecoder{}
return result, nil
case types.Box2DFamily:
result.node, err = schema.NewPrimitiveNodeLogical(colName,
Expand All @@ -261,7 +246,6 @@ func makeColumn(colName string, typ *types.T, repetitions parquet.Repetition) (c
return result, err
}
result.colWriter = scalarWriter(writeBox2D)
result.decoder = box2DDecoder{}
return result, nil
case types.GeographyFamily:
result.node, err = schema.NewPrimitiveNode(colName,
Expand All @@ -271,7 +255,6 @@ func makeColumn(colName string, typ *types.T, repetitions parquet.Repetition) (c
return result, err
}
result.colWriter = scalarWriter(writeGeography)
result.decoder = geographyDecoder{}
return result, nil
case types.GeometryFamily:
result.node, err = schema.NewPrimitiveNode(colName,
Expand All @@ -281,7 +264,6 @@ func makeColumn(colName string, typ *types.T, repetitions parquet.Repetition) (c
return result, err
}
result.colWriter = scalarWriter(writeGeometry)
result.decoder = geometryDecoder{}
return result, nil
case types.IntervalFamily:
result.node, err = schema.NewPrimitiveNodeLogical(colName,
Expand All @@ -291,7 +273,6 @@ func makeColumn(colName string, typ *types.T, repetitions parquet.Repetition) (c
return result, err
}
result.colWriter = scalarWriter(writeInterval)
result.decoder = intervalDecoder{}
return result, nil
case types.TimeFamily:
// CRDB stores time datums in microseconds, adjusted to UTC.
Expand All @@ -303,7 +284,6 @@ func makeColumn(colName string, typ *types.T, repetitions parquet.Repetition) (c
return result, err
}
result.colWriter = scalarWriter(writeTime)
result.decoder = timeDecoder{}
return result, nil
case types.TimeTZFamily:
// We cannot use the schema.NewTimeLogicalType because it does not support
Expand All @@ -315,7 +295,6 @@ func makeColumn(colName string, typ *types.T, repetitions parquet.Repetition) (c
return result, err
}
result.colWriter = scalarWriter(writeTimeTZ)
result.decoder = timeTZDecoder{}
return result, nil
case types.FloatFamily:
if typ.Oid() == oid.T_float4 {
Expand All @@ -326,7 +305,6 @@ func makeColumn(colName string, typ *types.T, repetitions parquet.Repetition) (c
return result, err
}
result.colWriter = scalarWriter(writeFloat32)
result.decoder = float32Decoder{}
return result, nil
}
result.node, err = schema.NewPrimitiveNode(colName,
Expand All @@ -336,12 +314,10 @@ func makeColumn(colName string, typ *types.T, repetitions parquet.Repetition) (c
return result, err
}
result.colWriter = scalarWriter(writeFloat64)
result.decoder = float64Decoder{}
return result, nil
case types.OidFamily:
result.node = schema.NewInt32Node(colName, repetitions, defaultSchemaFieldID)
result.colWriter = scalarWriter(writeOid)
result.decoder = oidDecoder{}
return result, nil
case types.CollatedStringFamily:
result.node, err = schema.NewPrimitiveNodeLogical(colName,
Expand All @@ -351,7 +327,6 @@ func makeColumn(colName string, typ *types.T, repetitions parquet.Repetition) (c
return result, err
}
result.colWriter = scalarWriter(writeCollatedString)
result.decoder = collatedStringDecoder{}
return result, nil
case types.ArrayFamily:
// Arrays for type T are represented by the following:
Expand Down Expand Up @@ -383,7 +358,6 @@ func makeColumn(colName string, typ *types.T, repetitions parquet.Repetition) (c
if err != nil {
return result, err
}
result.decoder = elementCol.decoder
scalarColWriter, ok := elementCol.colWriter.(scalarWriter)
if !ok {
return result, errors.AssertionFailedf("expected scalar column writer")
Expand Down
Loading

0 comments on commit 1ceb218

Please sign in to comment.