From dfa8e22edf560211ae2a2ebf1f9a23b86887c7be Mon Sep 17 00:00:00 2001 From: shollyman Date: Tue, 24 Oct 2023 10:55:38 -0700 Subject: [PATCH] feat(bigquery/storage/managedwriter): support default value controls (#8686) * feat(bigquery/storage/managedwriter): support default value controls In terms of public surface, this PR adds new options to control how missing values are interpreted when writing. For ManagedStream instantiation, the options are: * WithDefaultMissingValueInterpretation (blanket setting for all columns) * WithMissingValueInterpretations (per-column settings) To support updates, these are added as AppendOptions: * UpdateDefaultMissingValueInterpretation * UpdateMissingValueInterpretations Implementation-wise, this PR rips out the previous schema-specific versioner and expands the concept to a versioned AppendRowsRequest template. This more general mechanism allows us to version all settings that manifest as request fields in the AppendRowsRequest. --- .../storage/managedwriter/appendresult.go | 33 +-- .../managedwriter/appendresult_test.go | 15 +- bigquery/storage/managedwriter/client.go | 1 + bigquery/storage/managedwriter/connection.go | 18 +- .../storage/managedwriter/integration_test.go | 95 +++++- .../storage/managedwriter/managed_stream.go | 17 +- .../managedwriter/managed_stream_test.go | 18 +- bigquery/storage/managedwriter/options.go | 70 ++++- .../storage/managedwriter/options_test.go | 92 ++++++ .../storage/managedwriter/send_optimizer.go | 155 ++++++---- .../managedwriter/send_optimizer_test.go | 158 +++++----- .../storage/managedwriter/testdata/schemas.go | 34 +++ .../managedwriter/testdata/testing.pb.go | 275 ++++++++++++++++-- .../managedwriter/testdata/testing.proto | 18 ++ 14 files changed, 792 insertions(+), 207 deletions(-) diff --git a/bigquery/storage/managedwriter/appendresult.go b/bigquery/storage/managedwriter/appendresult.go index 47959eb14b5e..46709e70bf60 100644 --- a/bigquery/storage/managedwriter/appendresult.go +++ b/bigquery/storage/managedwriter/appendresult.go @@ -166,7 +166,7 @@ type pendingWrite struct { // likely outcome when processing requests and it allows us to be efficient on send. // We retain the additional information to build the complete request in the related fields. req *storagepb.AppendRowsRequest - descVersion *descriptorVersion // schema at time of creation + reqTmpl *versionedTemplate // request template at time of creation traceID string writeStreamID string @@ -188,21 +188,21 @@ type pendingWrite struct { // to the pending results for later consumption. The provided context is // embedded in the pending write, as the write may be retried and we want // to respect the original context for expiry/cancellation etc. -func newPendingWrite(ctx context.Context, src *ManagedStream, req *storagepb.AppendRowsRequest, curDescVersion *descriptorVersion, writeStreamID, traceID string) *pendingWrite { +func newPendingWrite(ctx context.Context, src *ManagedStream, req *storagepb.AppendRowsRequest, reqTmpl *versionedTemplate, writeStreamID, traceID string) *pendingWrite { pw := &pendingWrite{ writer: src, result: newAppendResult(), reqCtx: ctx, - req: req, - descVersion: curDescVersion, + req: req, // minimal req, typically just row data + reqTmpl: reqTmpl, // remainder of templated request writeStreamID: writeStreamID, traceID: traceID, } // Compute the approx size for flow control purposes. pw.reqSize = proto.Size(pw.req) + len(writeStreamID) + len(traceID) - if pw.descVersion != nil { - pw.reqSize += proto.Size(pw.descVersion.descriptorProto) + if pw.reqTmpl != nil { + pw.reqSize += proto.Size(pw.reqTmpl.tmpl) } return pw } @@ -221,33 +221,22 @@ func (pw *pendingWrite) markDone(resp *storagepb.AppendRowsResponse, err error) close(pw.result.ready) // Cleanup references remaining on the write explicitly. pw.req = nil - pw.descVersion = nil + pw.reqTmpl = nil pw.writer = nil pw.reqCtx = nil } func (pw *pendingWrite) constructFullRequest(addTrace bool) *storagepb.AppendRowsRequest { req := &storagepb.AppendRowsRequest{} + if pw.reqTmpl != nil { + req = proto.Clone(pw.reqTmpl.tmpl).(*storagepb.AppendRowsRequest) + } if pw.req != nil { - req = proto.Clone(pw.req).(*storagepb.AppendRowsRequest) + proto.Merge(req, pw.req) } if addTrace { req.TraceId = buildTraceID(&streamSettings{TraceID: pw.traceID}) } req.WriteStream = pw.writeStreamID - if pw.descVersion != nil { - ps := &storagepb.ProtoSchema{ - ProtoDescriptor: pw.descVersion.descriptorProto, - } - if pr := req.GetProtoRows(); pr != nil { - pr.WriterSchema = ps - } else { - req.Rows = &storagepb.AppendRowsRequest_ProtoRows{ - ProtoRows: &storagepb.AppendRowsRequest_ProtoData{ - WriterSchema: ps, - }, - } - } - } return req } diff --git a/bigquery/storage/managedwriter/appendresult_test.go b/bigquery/storage/managedwriter/appendresult_test.go index 43a3804633de..ced180295ef7 100644 --- a/bigquery/storage/managedwriter/appendresult_test.go +++ b/bigquery/storage/managedwriter/appendresult_test.go @@ -132,7 +132,8 @@ func TestPendingWrite(t *testing.T) { func TestPendingWrite_ConstructFullRequest(t *testing.T) { testDP := &descriptorpb.DescriptorProto{Name: proto.String("foo")} - testDV := newDescriptorVersion(testDP) + testTmpl := newVersionedTemplate().revise(reviseProtoSchema(testDP)) + testEmptyTraceID := buildTraceID(&streamSettings{}) for _, tc := range []struct { @@ -144,7 +145,7 @@ func TestPendingWrite_ConstructFullRequest(t *testing.T) { { desc: "nil request", pw: &pendingWrite{ - descVersion: testDV, + reqTmpl: testTmpl, }, want: &storagepb.AppendRowsRequest{ Rows: &storagepb.AppendRowsRequest_ProtoRows{ @@ -159,8 +160,8 @@ func TestPendingWrite_ConstructFullRequest(t *testing.T) { { desc: "empty req w/trace", pw: &pendingWrite{ - req: &storagepb.AppendRowsRequest{}, - descVersion: testDV, + req: &storagepb.AppendRowsRequest{}, + reqTmpl: testTmpl, }, addTrace: true, want: &storagepb.AppendRowsRequest{ @@ -177,8 +178,8 @@ func TestPendingWrite_ConstructFullRequest(t *testing.T) { { desc: "basic req", pw: &pendingWrite{ - req: &storagepb.AppendRowsRequest{}, - descVersion: testDV, + req: &storagepb.AppendRowsRequest{}, + reqTmpl: testTmpl, }, want: &storagepb.AppendRowsRequest{ Rows: &storagepb.AppendRowsRequest_ProtoRows{ @@ -194,7 +195,7 @@ func TestPendingWrite_ConstructFullRequest(t *testing.T) { desc: "everything w/trace", pw: &pendingWrite{ req: &storagepb.AppendRowsRequest{}, - descVersion: testDV, + reqTmpl: testTmpl, traceID: "foo", writeStreamID: "streamid", }, diff --git a/bigquery/storage/managedwriter/client.go b/bigquery/storage/managedwriter/client.go index d75e711a0ef8..ec872ec13861 100644 --- a/bigquery/storage/managedwriter/client.go +++ b/bigquery/storage/managedwriter/client.go @@ -151,6 +151,7 @@ func (c *Client) buildManagedStream(ctx context.Context, streamFunc streamClient id: newUUID(writerIDPrefix), c: c, streamSettings: defaultStreamSettings(), + curTemplate: newVersionedTemplate(), } // apply writer options. for _, opt := range opts { diff --git a/bigquery/storage/managedwriter/connection.go b/bigquery/storage/managedwriter/connection.go index d41ecc6e053a..5c3d81f1c59b 100644 --- a/bigquery/storage/managedwriter/connection.go +++ b/bigquery/storage/managedwriter/connection.go @@ -376,8 +376,22 @@ func (co *connection) lockingAppend(pw *pendingWrite) error { // Additionally, we check multiplex status as schema changes for explicit streams // require reconnect, whereas multiplex does not. forceReconnect := false - if pw.writer != nil && pw.descVersion != nil && pw.descVersion.isNewer(pw.writer.curDescVersion) { - pw.writer.curDescVersion = pw.descVersion + promoted := false + if pw.writer != nil && pw.reqTmpl != nil { + if !pw.reqTmpl.Compatible(pw.writer.curTemplate) { + if pw.writer.curTemplate == nil { + // promote because there's no current template + pw.writer.curTemplate = pw.reqTmpl + promoted = true + } else { + if pw.writer.curTemplate.versionTime.Before(pw.reqTmpl.versionTime) { + pw.writer.curTemplate = pw.reqTmpl + promoted = true + } + } + } + } + if promoted { if co.optimizer == nil { forceReconnect = true } else { diff --git a/bigquery/storage/managedwriter/integration_test.go b/bigquery/storage/managedwriter/integration_test.go index e13439afa664..03970787bdc2 100644 --- a/bigquery/storage/managedwriter/integration_test.go +++ b/bigquery/storage/managedwriter/integration_test.go @@ -259,7 +259,9 @@ func TestIntegration_ManagedWriter(t *testing.T) { t.Run("TestLargeInsertWithRetry", func(t *testing.T) { testLargeInsertWithRetry(ctx, t, mwClient, bqClient, dataset) }) - + t.Run("DefaultValueHandling", func(t *testing.T) { + testDefaultValueHandling(ctx, t, mwClient, bqClient, dataset) + }) }) } @@ -1262,6 +1264,97 @@ func testSchemaEvolution(ctx context.Context, t *testing.T, mwClient *Client, bq ) } +func testDefaultValueHandling(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset, opts ...WriterOption) { + testTable := dataset.Table(tableIDs.New()) + if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.DefaultValueSchema}); err != nil { + t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err) + } + + m := &testdata.DefaultValuesPartialSchema{ + // We only populate the id, as remaining fields are used to test default values. + Id: proto.String("someval"), + } + var data []byte + var err error + if data, err = proto.Marshal(m); err != nil { + t.Fatalf("failed to marshal test row data") + } + descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor()) + + // setup a new stream. + opts = append(opts, WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID))) + opts = append(opts, WithSchemaDescriptor(descriptorProto)) + ms, err := mwClient.NewManagedStream(ctx, opts...) + if err != nil { + t.Fatalf("NewManagedStream: %v", err) + } + validateTableConstraints(ctx, t, bqClient, testTable, "before send", + withExactRowCount(0)) + + var result *AppendResult + + // Send one row, verify default values were set as expected. + + result, err = ms.AppendRows(ctx, [][]byte{data}) + if err != nil { + t.Errorf("append failed: %v", err) + } + // Wait for the result to indicate ready, then validate. + _, err = result.GetResult(ctx) + if err != nil { + t.Errorf("error on append: %v", err) + } + + validateTableConstraints(ctx, t, bqClient, testTable, "after first row", + withExactRowCount(1), + withNonNullCount("id", 1), + withNullCount("strcol_withdef", 1), + withNullCount("intcol_withdef", 1), + withNullCount("otherstr_withdef", 0)) // not part of partial schema + + // Change default MVI to use nulls. + // We expect the fields in the partial schema to leverage nulls rather than default values. + // The fields outside the partial schema continue to obey default values. + result, err = ms.AppendRows(ctx, [][]byte{data}, UpdateDefaultMissingValueInterpretation(storagepb.AppendRowsRequest_DEFAULT_VALUE)) + if err != nil { + t.Errorf("append failed: %v", err) + } + // Wait for the result to indicate ready, then validate. + _, err = result.GetResult(ctx) + if err != nil { + t.Errorf("error on append: %v", err) + } + + validateTableConstraints(ctx, t, bqClient, testTable, "after second row (default mvi is DEFAULT_VALUE)", + withExactRowCount(2), + withNullCount("strcol_withdef", 1), // doesn't increment, as it gets default value + withNullCount("intcol_withdef", 1)) // doesn't increment, as it gets default value + + // Change per-column MVI to use default value + result, err = ms.AppendRows(ctx, [][]byte{data}, + UpdateMissingValueInterpretations(map[string]storagepb.AppendRowsRequest_MissingValueInterpretation{ + "strcol_withdef": storagepb.AppendRowsRequest_NULL_VALUE, + })) + if err != nil { + t.Errorf("append failed: %v", err) + } + // Wait for the result to indicate ready, then validate. + _, err = result.GetResult(ctx) + if err != nil { + t.Errorf("error on append: %v", err) + } + + validateTableConstraints(ctx, t, bqClient, testTable, "after third row (explicit column mvi)", + withExactRowCount(3), + withNullCount("strcol_withdef", 2), // increments as it's null for this column + withNullCount("intcol_withdef", 1), // doesn't increment, still default value + withNonNullCount("otherstr_withdef", 3), // not part of descriptor, always gets default value + withNullCount("otherstr", 3), // not part of descriptor, always gets null + withNullCount("strcol", 3), // no default value defined, always gets null + withNullCount("intcol", 3), // no default value defined, always gets null + ) +} + func TestIntegration_DetectProjectID(t *testing.T) { ctx := context.Background() testCreds := testutil.Credentials(ctx) diff --git a/bigquery/storage/managedwriter/managed_stream.go b/bigquery/storage/managedwriter/managed_stream.go index faaf9a776a48..090933d7e706 100644 --- a/bigquery/storage/managedwriter/managed_stream.go +++ b/bigquery/storage/managedwriter/managed_stream.go @@ -78,9 +78,9 @@ type ManagedStream struct { streamSettings *streamSettings // retains the current descriptor for the stream. - curDescVersion *descriptorVersion - c *Client - retry *statelessRetryer + curTemplate *versionedTemplate + c *Client + retry *statelessRetryer // writer state mu sync.Mutex @@ -298,13 +298,20 @@ func (ms *ManagedStream) AppendRows(ctx context.Context, data [][]byte, opts ... return nil, err } // Ensure we build the request and pending write with a consistent schema version. - curSchemaVersion := ms.curDescVersion + curTemplate := ms.curTemplate req := ms.buildRequest(data) - pw := newPendingWrite(ctx, ms, req, curSchemaVersion, ms.streamSettings.streamID, ms.streamSettings.TraceID) + pw := newPendingWrite(ctx, ms, req, curTemplate, ms.streamSettings.streamID, ms.streamSettings.TraceID) // apply AppendOption opts for _, opt := range opts { opt(pw) } + // Post-request fixup after options are applied. + if pw.reqTmpl != nil { + if pw.reqTmpl.tmpl != nil { + // MVIs must be set on each request, but _default_ MVIs persist across the stream lifetime. Sigh. + pw.req.MissingValueInterpretations = pw.reqTmpl.tmpl.GetMissingValueInterpretations() + } + } // Call the underlying append. The stream has it's own retained context and will surface expiry on // it's own, but we also need to respect any deadline for the provided context. diff --git a/bigquery/storage/managedwriter/managed_stream_test.go b/bigquery/storage/managedwriter/managed_stream_test.go index 9a69b2c24e12..6ec2ca584a7f 100644 --- a/bigquery/storage/managedwriter/managed_stream_test.go +++ b/bigquery/storage/managedwriter/managed_stream_test.go @@ -110,7 +110,7 @@ func TestManagedStream_RequestOptimization(t *testing.T) { } ms.streamSettings.streamID = "FOO" ms.streamSettings.TraceID = "TRACE" - ms.curDescVersion = newDescriptorVersion(&descriptorpb.DescriptorProto{}) + ms.curTemplate = newVersionedTemplate().revise(reviseProtoSchema(&descriptorpb.DescriptorProto{})) fakeData := [][]byte{ []byte("foo"), @@ -191,7 +191,7 @@ func TestManagedStream_FlowControllerFailure(t *testing.T) { router.conn.fc = newFlowController(1, 0) router.conn.fc.acquire(ctx, 0) - ms.curDescVersion = newDescriptorVersion(&descriptorpb.DescriptorProto{}) + ms.curTemplate = newVersionedTemplate().revise(reviseProtoSchema(&descriptorpb.DescriptorProto{})) fakeData := [][]byte{ []byte("foo"), @@ -236,7 +236,7 @@ func TestManagedStream_AppendWithDeadline(t *testing.T) { t.Errorf("addWriter: %v", err) } conn := router.conn - ms.curDescVersion = newDescriptorVersion(&descriptorpb.DescriptorProto{}) + ms.curTemplate = newVersionedTemplate().revise(reviseProtoSchema(&descriptorpb.DescriptorProto{})) fakeData := [][]byte{ []byte("foo"), @@ -293,7 +293,7 @@ func TestManagedStream_ContextExpiry(t *testing.T) { ctx: ctx, streamSettings: defaultStreamSettings(), } - ms.curDescVersion = newDescriptorVersion(&descriptorpb.DescriptorProto{}) + ms.curTemplate = newVersionedTemplate().revise(reviseProtoSchema(&descriptorpb.DescriptorProto{})) if err := pool.addWriter(ms); err != nil { t.Errorf("addWriter: %v", err) } @@ -316,7 +316,7 @@ func TestManagedStream_ContextExpiry(t *testing.T) { cancel() // First, append with an invalid context. - pw := newPendingWrite(cancelCtx, ms, fakeReq, ms.curDescVersion, "", "") + pw := newPendingWrite(cancelCtx, ms, fakeReq, ms.curTemplate, "", "") err := ms.appendWithRetry(pw) if err != context.Canceled { t.Errorf("expected cancelled context error, got: %v", err) @@ -457,7 +457,7 @@ func TestManagedStream_LeakingGoroutines(t *testing.T) { ctx: ctx, streamSettings: defaultStreamSettings(), } - ms.curDescVersion = newDescriptorVersion(&descriptorpb.DescriptorProto{}) + ms.curTemplate = newVersionedTemplate().revise(reviseProtoSchema(&descriptorpb.DescriptorProto{})) if err := pool.addWriter(ms); err != nil { t.Errorf("addWriter: %v", err) } @@ -509,7 +509,7 @@ func TestManagedStream_LeakingGoroutinesReconnect(t *testing.T) { retry: newStatelessRetryer(), } ms.retry.maxAttempts = 4 - ms.curDescVersion = newDescriptorVersion(&descriptorpb.DescriptorProto{}) + ms.curTemplate = newVersionedTemplate().revise(reviseProtoSchema(&descriptorpb.DescriptorProto{})) if err := pool.addWriter(ms); err != nil { t.Errorf("addWriter: %v", err) } @@ -575,7 +575,7 @@ func TestManagedWriter_CancellationDuringRetry(t *testing.T) { streamSettings: defaultStreamSettings(), retry: newStatelessRetryer(), } - ms.curDescVersion = newDescriptorVersion(&descriptorpb.DescriptorProto{}) + ms.curTemplate = newVersionedTemplate().revise(reviseProtoSchema(&descriptorpb.DescriptorProto{})) if err := pool.addWriter(ms); err != nil { t.Errorf("addWriter: %v", err) } @@ -624,7 +624,7 @@ func TestManagedStream_Closure(t *testing.T) { streamSettings: defaultStreamSettings(), } ms.ctx, ms.cancel = context.WithCancel(pool.ctx) - ms.curDescVersion = newDescriptorVersion(&descriptorpb.DescriptorProto{}) + ms.curTemplate = newVersionedTemplate().revise(reviseProtoSchema(&descriptorpb.DescriptorProto{})) if err := pool.addWriter(ms); err != nil { t.Errorf("addWriter A: %v", err) } diff --git a/bigquery/storage/managedwriter/options.go b/bigquery/storage/managedwriter/options.go index 47e296f68615..18f988123843 100644 --- a/bigquery/storage/managedwriter/options.go +++ b/bigquery/storage/managedwriter/options.go @@ -15,6 +15,7 @@ package managedwriter import ( + "cloud.google.com/go/bigquery/storage/apiv1/storagepb" "github.com/googleapis/gax-go/v2" "google.golang.org/api/option" "google.golang.org/api/option/internaloption" @@ -234,7 +235,53 @@ func WithTraceID(traceID string) WriterOption { // AppendRows calls on the stream. func WithSchemaDescriptor(dp *descriptorpb.DescriptorProto) WriterOption { return func(ms *ManagedStream) { - ms.curDescVersion = newDescriptorVersion(dp) + ms.curTemplate = ms.curTemplate.revise(reviseProtoSchema(dp)) + } +} + +// WithMissingValueInterpretations controls how missing values are interpreted +// for individual columns. +// +// You must provide a map to indicate how to interpret missing value for some fields. Missing +// values are fields present in user schema but missing in rows. The key is +// the field name. The value is the interpretation of missing values for the +// field. +// +// For example, the following option would indicate that missing values in the "foo" +// column are interpreted as null, whereas missing values in the "bar" column are +// treated as the default value: +// +// WithMissingValueInterpretations(map[string]storagepb.AppendRowsRequest_MissingValueInterpretation{ +// "foo": storagepb.AppendRowsRequest_DEFAULT_VALUE, +// "bar": storagepb.AppendRowsRequest_NULL_VALUE, +// }) +// +// If a field is not in this map and has missing values, the missing values +// in this field are interpreted as NULL unless overridden with a default missing +// value interpretation. +// +// Currently, field name can only be top-level column name, can't be a struct +// field path like 'foo.bar'. +func WithMissingValueInterpretations(mvi map[string]storagepb.AppendRowsRequest_MissingValueInterpretation) WriterOption { + return func(ms *ManagedStream) { + ms.curTemplate = ms.curTemplate.revise(reviseMissingValueInterpretations(mvi)) + } +} + +// WithDefaultMissingValueInterpretation controls how missing values are interpreted by +// for a given stream. See WithMissingValueIntepretations for more information about +// missing values. +// +// WithMissingValueIntepretations set for individual colums can override the default chosen +// with this option. +// +// For example, if you want to write +// `NULL` instead of using default values for some columns, you can set +// `default_missing_value_interpretation` to `DEFAULT_VALUE` and at the same +// time, set `missing_value_interpretations` to `NULL_VALUE` on those columns. +func WithDefaultMissingValueInterpretation(def storagepb.AppendRowsRequest_MissingValueInterpretation) WriterOption { + return func(ms *ManagedStream) { + ms.curTemplate = ms.curTemplate.revise(reviseDefaultMissingValueInterpretation(def)) } } @@ -278,8 +325,25 @@ type AppendOption func(*pendingWrite) // with a given stream. func UpdateSchemaDescriptor(schema *descriptorpb.DescriptorProto) AppendOption { return func(pw *pendingWrite) { - // create a new descriptorVersion and attach it to the pending write. - pw.descVersion = newDescriptorVersion(schema) + pw.reqTmpl = pw.reqTmpl.revise(reviseProtoSchema(schema)) + } +} + +// UpdateMissingValueInterpretations updates the per-column missing-value intepretations settings, +// and is retained for subsequent writes. See the WithMissingValueInterpretations WriterOption for +// more details. +func UpdateMissingValueInterpretations(mvi map[string]storagepb.AppendRowsRequest_MissingValueInterpretation) AppendOption { + return func(pw *pendingWrite) { + pw.reqTmpl = pw.reqTmpl.revise(reviseMissingValueInterpretations(mvi)) + } +} + +// UpdateDefaultMissingValueInterpretation updates the default intepretations setting for the stream, +// and is retained for subsequent writes. See the WithDefaultMissingValueInterpretations WriterOption for +// more details. +func UpdateDefaultMissingValueInterpretation(def storagepb.AppendRowsRequest_MissingValueInterpretation) AppendOption { + return func(pw *pendingWrite) { + pw.reqTmpl = pw.reqTmpl.revise(reviseDefaultMissingValueInterpretation(def)) } } diff --git a/bigquery/storage/managedwriter/options_test.go b/bigquery/storage/managedwriter/options_test.go index cb64d5e0b3a1..e8a64b6fdd6f 100644 --- a/bigquery/storage/managedwriter/options_test.go +++ b/bigquery/storage/managedwriter/options_test.go @@ -18,11 +18,15 @@ import ( "sync" "testing" + "cloud.google.com/go/bigquery/storage/apiv1/storagepb" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" "github.com/googleapis/gax-go/v2" "google.golang.org/api/option" "google.golang.org/grpc" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/testing/protocmp" + "google.golang.org/protobuf/types/descriptorpb" ) func TestCustomClientOptions(t *testing.T) { @@ -140,6 +144,7 @@ func TestWriterOptions(t *testing.T) { want: func() *ManagedStream { ms := &ManagedStream{ streamSettings: defaultStreamSettings(), + curTemplate: newVersionedTemplate(), } ms.streamSettings.streamType = BufferedStream return ms @@ -151,6 +156,7 @@ func TestWriterOptions(t *testing.T) { want: func() *ManagedStream { ms := &ManagedStream{ streamSettings: defaultStreamSettings(), + curTemplate: newVersionedTemplate(), } ms.streamSettings.MaxInflightRequests = 2 return ms @@ -162,6 +168,7 @@ func TestWriterOptions(t *testing.T) { want: func() *ManagedStream { ms := &ManagedStream{ streamSettings: defaultStreamSettings(), + curTemplate: newVersionedTemplate(), } ms.streamSettings.MaxInflightBytes = 5 return ms @@ -173,6 +180,7 @@ func TestWriterOptions(t *testing.T) { want: func() *ManagedStream { ms := &ManagedStream{ streamSettings: defaultStreamSettings(), + curTemplate: newVersionedTemplate(), } ms.streamSettings.TraceID = "foo" return ms @@ -184,6 +192,7 @@ func TestWriterOptions(t *testing.T) { want: func() *ManagedStream { ms := &ManagedStream{ streamSettings: defaultStreamSettings(), + curTemplate: newVersionedTemplate(), } ms.streamSettings.destinationTable = "foo" return ms @@ -195,6 +204,7 @@ func TestWriterOptions(t *testing.T) { want: func() *ManagedStream { ms := &ManagedStream{ streamSettings: defaultStreamSettings(), + curTemplate: newVersionedTemplate(), } ms.streamSettings.dataOrigin = "origin" return ms @@ -206,6 +216,7 @@ func TestWriterOptions(t *testing.T) { want: func() *ManagedStream { ms := &ManagedStream{ streamSettings: defaultStreamSettings(), + curTemplate: newVersionedTemplate(), } ms.streamSettings.appendCallOptions = append(ms.streamSettings.appendCallOptions, gax.WithGRPCOptions(grpc.MaxCallSendMsgSize(1))) @@ -218,11 +229,66 @@ func TestWriterOptions(t *testing.T) { want: func() *ManagedStream { ms := &ManagedStream{ streamSettings: defaultStreamSettings(), + curTemplate: newVersionedTemplate(), } ms.retry = newStatelessRetryer() return ms }(), }, + { + desc: "WithSchemaDescriptor", + options: []WriterOption{WithSchemaDescriptor(&descriptorpb.DescriptorProto{Name: proto.String("name")})}, + want: func() *ManagedStream { + ms := &ManagedStream{ + streamSettings: defaultStreamSettings(), + curTemplate: newVersionedTemplate(), + } + ms.curTemplate.tmpl = &storagepb.AppendRowsRequest{ + Rows: &storagepb.AppendRowsRequest_ProtoRows{ + ProtoRows: &storagepb.AppendRowsRequest_ProtoData{ + WriterSchema: &storagepb.ProtoSchema{ + ProtoDescriptor: &descriptorpb.DescriptorProto{Name: proto.String("name")}, + }, + }, + }, + } + return ms + }(), + }, + { + desc: "WithDefaultMissingValueInterpretation", + options: []WriterOption{WithDefaultMissingValueInterpretation(storagepb.AppendRowsRequest_DEFAULT_VALUE)}, + want: func() *ManagedStream { + ms := &ManagedStream{ + streamSettings: defaultStreamSettings(), + curTemplate: newVersionedTemplate(), + } + ms.curTemplate.tmpl = &storagepb.AppendRowsRequest{ + DefaultMissingValueInterpretation: storagepb.AppendRowsRequest_DEFAULT_VALUE, + } + return ms + }(), + }, + { + desc: "WithtMissingValueInterpretations", + options: []WriterOption{WithMissingValueInterpretations(map[string]storagepb.AppendRowsRequest_MissingValueInterpretation{ + "foo": storagepb.AppendRowsRequest_DEFAULT_VALUE, + "bar": storagepb.AppendRowsRequest_NULL_VALUE, + })}, + want: func() *ManagedStream { + ms := &ManagedStream{ + streamSettings: defaultStreamSettings(), + curTemplate: newVersionedTemplate(), + } + ms.curTemplate.tmpl = &storagepb.AppendRowsRequest{ + MissingValueInterpretations: map[string]storagepb.AppendRowsRequest_MissingValueInterpretation{ + "foo": storagepb.AppendRowsRequest_DEFAULT_VALUE, + "bar": storagepb.AppendRowsRequest_NULL_VALUE, + }, + } + return ms + }(), + }, { desc: "multiple", options: []WriterOption{ @@ -230,10 +296,31 @@ func TestWriterOptions(t *testing.T) { WithMaxInflightBytes(5), WithTraceID("traceid"), EnableWriteRetries(true), + WithSchemaDescriptor(&descriptorpb.DescriptorProto{Name: proto.String("name")}), + WithDefaultMissingValueInterpretation(storagepb.AppendRowsRequest_DEFAULT_VALUE), + WithMissingValueInterpretations(map[string]storagepb.AppendRowsRequest_MissingValueInterpretation{ + "foo": storagepb.AppendRowsRequest_DEFAULT_VALUE, + "bar": storagepb.AppendRowsRequest_NULL_VALUE, + }), }, want: func() *ManagedStream { ms := &ManagedStream{ streamSettings: defaultStreamSettings(), + curTemplate: newVersionedTemplate(), + } + ms.curTemplate.tmpl = &storagepb.AppendRowsRequest{ + Rows: &storagepb.AppendRowsRequest_ProtoRows{ + ProtoRows: &storagepb.AppendRowsRequest_ProtoData{ + WriterSchema: &storagepb.ProtoSchema{ + ProtoDescriptor: &descriptorpb.DescriptorProto{Name: proto.String("name")}, + }, + }, + }, + MissingValueInterpretations: map[string]storagepb.AppendRowsRequest_MissingValueInterpretation{ + "foo": storagepb.AppendRowsRequest_DEFAULT_VALUE, + "bar": storagepb.AppendRowsRequest_NULL_VALUE, + }, + DefaultMissingValueInterpretation: storagepb.AppendRowsRequest_DEFAULT_VALUE, } ms.streamSettings.MaxInflightBytes = 5 ms.streamSettings.streamType = PendingStream @@ -247,6 +334,7 @@ func TestWriterOptions(t *testing.T) { for _, tc := range testCases { got := &ManagedStream{ streamSettings: defaultStreamSettings(), + curTemplate: newVersionedTemplate(), } for _, o := range tc.options { o(got) @@ -255,8 +343,12 @@ func TestWriterOptions(t *testing.T) { if diff := cmp.Diff(got, tc.want, cmp.AllowUnexported(ManagedStream{}, streamSettings{}), cmp.AllowUnexported(sync.Mutex{}), + cmp.AllowUnexported(versionedTemplate{}), + cmpopts.IgnoreFields(versionedTemplate{}, "versionTime", "hashVal"), + protocmp.Transform(), // versionedTemplate embeds proto messages. cmpopts.IgnoreUnexported(statelessRetryer{})); diff != "" { t.Errorf("diff in case (%s):\n%v", tc.desc, diff) } + } } diff --git a/bigquery/storage/managedwriter/send_optimizer.go b/bigquery/storage/managedwriter/send_optimizer.go index 008fae50ee94..8feb9d2dee3e 100644 --- a/bigquery/storage/managedwriter/send_optimizer.go +++ b/bigquery/storage/managedwriter/send_optimizer.go @@ -15,6 +15,8 @@ package managedwriter import ( + "bytes" + "encoding/binary" "hash/crc32" "time" @@ -104,15 +106,15 @@ func (so *simplexOptimizer) isMultiplexing() bool { // Schema evolution is simply a case of sending the new WriterSchema as part of the request(s). No explicit // reconnection is necessary. type multiplexOptimizer struct { - prevStream string - prevDescriptorVersion *descriptorVersion - multiplexStreams bool + prevStream string + prevTemplate *versionedTemplate + multiplexStreams bool } func (mo *multiplexOptimizer) signalReset() { mo.prevStream = "" mo.multiplexStreams = false - mo.prevDescriptorVersion = nil + mo.prevTemplate = nil } func (mo *multiplexOptimizer) optimizeSend(arc storagepb.BigQueryWrite_AppendRowsClient, pw *pendingWrite) error { @@ -123,7 +125,7 @@ func (mo *multiplexOptimizer) optimizeSend(arc storagepb.BigQueryWrite_AppendRow err = arc.Send(req) if err == nil { mo.prevStream = req.GetWriteStream() - mo.prevDescriptorVersion = pw.descVersion + mo.prevTemplate = pw.reqTmpl } } else { // We have a previous send. Determine if it's the same stream or a different one. @@ -135,15 +137,15 @@ func (mo *multiplexOptimizer) optimizeSend(arc storagepb.BigQueryWrite_AppendRow // swapOnSuccess tracks if we need to update schema versions on successful send. swapOnSuccess := false req := pw.req - if mo.prevDescriptorVersion != nil { - if !mo.prevDescriptorVersion.eqVersion(pw.descVersion) { + if mo.prevTemplate != nil { + if !mo.prevTemplate.Compatible(pw.reqTmpl) { swapOnSuccess = true req = pw.constructFullRequest(false) // full request minus traceID. } } err = arc.Send(req) if err == nil && swapOnSuccess { - mo.prevDescriptorVersion = pw.descVersion + mo.prevTemplate = pw.reqTmpl } } else { // The previous send was for a different stream. Send a full request, minus traceId. @@ -152,7 +154,7 @@ func (mo *multiplexOptimizer) optimizeSend(arc storagepb.BigQueryWrite_AppendRow if err == nil { // Send successful. Update state to reflect this send is now the "previous" state. mo.prevStream = pw.writeStreamID - mo.prevDescriptorVersion = pw.descVersion + mo.prevTemplate = pw.reqTmpl } // Also, note that we've sent traffic for multiple streams, which means the backend recognizes this // is a multiplex stream as well. @@ -166,63 +168,104 @@ func (mo *multiplexOptimizer) isMultiplexing() bool { return mo.multiplexStreams } -// getDescriptorFromAppend is a utility method for extracting the deeply nested schema -// descriptor from a request. It returns a nil if the descriptor is not set. -func getDescriptorFromAppend(req *storagepb.AppendRowsRequest) *descriptorpb.DescriptorProto { - if pr := req.GetProtoRows(); pr != nil { - if ws := pr.GetWriterSchema(); ws != nil { - return ws.GetProtoDescriptor() - } - } - return nil +// versionedTemplate is used for faster comparison of the templated part of +// an AppendRowsRequest, which bears settings-like fields related to schema +// and default value configuration. Direct proto comparison through something +// like proto.Equal is far too expensive, so versionTemplate leverages a faster +// hash-based comparison to avoid the deep equality checks. +type versionedTemplate struct { + versionTime time.Time + hashVal uint32 + tmpl *storagepb.AppendRowsRequest } -// descriptorVersion is used for faster comparisons of proto descriptors. Deep equality comparisons -// of DescriptorProto can be very costly, so we use a simple versioning strategy based on -// time and a crc32 hash of the serialized proto bytes. -// -// The descriptorVersion is used for retaining schema, signalling schema change and optimizing requests. -type descriptorVersion struct { - versionTime time.Time - descriptorProto *descriptorpb.DescriptorProto - hashVal uint32 -} - -func newDescriptorVersion(in *descriptorpb.DescriptorProto) *descriptorVersion { - var hashVal uint32 - // It is a known issue that we may have non-deterministic serialization of a DescriptorProto - // due to the nature of protobuf. Our primary protection is the time-based version identifier, - // this hashing is primarily for time collisions. - if b, err := proto.Marshal(in); err == nil { - hashVal = crc32.ChecksumIEEE(b) +func newVersionedTemplate() *versionedTemplate { + vt := &versionedTemplate{ + versionTime: time.Now(), + tmpl: &storagepb.AppendRowsRequest{}, } - return &descriptorVersion{ - versionTime: time.Now(), - descriptorProto: proto.Clone(in).(*descriptorpb.DescriptorProto), - hashVal: hashVal, + vt.computeHash() + return vt +} + +// computeHash is an internal utility function for calculating the hash value +// for faster comparison. +func (vt *versionedTemplate) computeHash() { + buf := new(bytes.Buffer) + if b, err := proto.Marshal(vt.tmpl); err == nil { + buf.Write(b) + } else { + // if we fail to serialize the proto (unlikely), consume the timestamp for input instead. + binary.Write(buf, binary.LittleEndian, vt.versionTime.UnixNano()) } + vt.hashVal = crc32.ChecksumIEEE(buf.Bytes()) } -// eqVersion is the fast equality comparison that uses the versionTime and crc32 hash -// in place of deep proto equality. -func (dv *descriptorVersion) eqVersion(other *descriptorVersion) bool { - if dv == nil || other == nil { - return false +type templateRevisionF func(m *storagepb.AppendRowsRequest) + +// revise makes a new versionedTemplate from the existing template, applying any changes. +// The original revision is returned if there's no effective difference after changes are +// applied. +func (vt *versionedTemplate) revise(changes ...templateRevisionF) *versionedTemplate { + before := vt + if before == nil { + before = newVersionedTemplate() + } + if len(changes) == 0 { + // if there's no changes, return the base revision immediately. + return before + } + out := &versionedTemplate{ + versionTime: time.Now(), + tmpl: proto.Clone(before.tmpl).(*storagepb.AppendRowsRequest), + } + for _, r := range changes { + r(out.tmpl) + } + out.computeHash() + if out.Compatible(before) { + // The changes didn't yield an measured difference. Return the base revision to avoid + // possible connection churn from no-op revisions. + return before } - if dv.versionTime != other.versionTime { - return false + return out +} + +// Compatible is effectively a fast equality check, that relies on the hash value +// and avoids the potentially very costly deep comparison of the proto message templates. +func (vt *versionedTemplate) Compatible(other *versionedTemplate) bool { + if other == nil { + return vt == nil } - if dv.hashVal == 0 || other.hashVal == 0 { - return false + return vt.hashVal == other.hashVal +} + +func reviseProtoSchema(newSchema *descriptorpb.DescriptorProto) templateRevisionF { + return func(m *storagepb.AppendRowsRequest) { + if m != nil { + m.Rows = &storagepb.AppendRowsRequest_ProtoRows{ + ProtoRows: &storagepb.AppendRowsRequest_ProtoData{ + WriterSchema: &storagepb.ProtoSchema{ + ProtoDescriptor: proto.Clone(newSchema).(*descriptorpb.DescriptorProto), + }, + }, + } + } } - if dv.hashVal != other.hashVal { - return false +} + +func reviseMissingValueInterpretations(vi map[string]storagepb.AppendRowsRequest_MissingValueInterpretation) templateRevisionF { + return func(m *storagepb.AppendRowsRequest) { + if m != nil { + m.MissingValueInterpretations = vi + } } - return true } -// isNewer reports whether the current schema bears a newer time (version) -// than the other. -func (dv *descriptorVersion) isNewer(other *descriptorVersion) bool { - return dv.versionTime.UnixNano() > other.versionTime.UnixNano() +func reviseDefaultMissingValueInterpretation(def storagepb.AppendRowsRequest_MissingValueInterpretation) templateRevisionF { + return func(m *storagepb.AppendRowsRequest) { + if m != nil { + m.DefaultMissingValueInterpretation = def + } + } } diff --git a/bigquery/storage/managedwriter/send_optimizer_test.go b/bigquery/storage/managedwriter/send_optimizer_test.go index 20253956b987..10948c5c1022 100644 --- a/bigquery/storage/managedwriter/send_optimizer_test.go +++ b/bigquery/storage/managedwriter/send_optimizer_test.go @@ -62,11 +62,11 @@ func TestSendOptimizer(t *testing.T) { description: "verbose-optimizer", optimizer: &verboseOptimizer{}, reqs: func() []*pendingWrite { - dv := newDescriptorVersion(exampleDP) + tmpl := newVersionedTemplate().revise(reviseProtoSchema(exampleDP)) return []*pendingWrite{ - newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), dv, exampleStreamID, exampleTraceID), - newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), dv, exampleStreamID, exampleTraceID), - newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), dv, exampleStreamID, exampleTraceID), + newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID), + newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID), + newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID), } }(), sendResults: []error{ @@ -84,11 +84,11 @@ func TestSendOptimizer(t *testing.T) { description: "simplex no errors", optimizer: &simplexOptimizer{}, reqs: func() []*pendingWrite { - dv := newDescriptorVersion(exampleDP) + tmpl := newVersionedTemplate().revise(reviseProtoSchema(exampleDP)) return []*pendingWrite{ - newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), dv, exampleStreamID, exampleTraceID), - newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), dv, exampleStreamID, exampleTraceID), - newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), dv, exampleStreamID, exampleTraceID), + newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID), + newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID), + newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID), } }(), sendResults: []error{ @@ -114,11 +114,11 @@ func TestSendOptimizer(t *testing.T) { description: "simplex w/partial errors", optimizer: &simplexOptimizer{}, reqs: func() []*pendingWrite { - dv := newDescriptorVersion(exampleDP) + tmpl := newVersionedTemplate().revise(reviseProtoSchema(exampleDP)) return []*pendingWrite{ - newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), dv, exampleStreamID, exampleTraceID), - newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), dv, exampleStreamID, exampleTraceID), - newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), dv, exampleStreamID, exampleTraceID), + newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID), + newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID), + newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID), } }(), sendResults: []error{ @@ -144,11 +144,11 @@ func TestSendOptimizer(t *testing.T) { description: "multiplex single all errors", optimizer: &multiplexOptimizer{}, reqs: func() []*pendingWrite { - dv := newDescriptorVersion(exampleDP) + tmpl := newVersionedTemplate().revise(reviseProtoSchema(exampleDP)) return []*pendingWrite{ - newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), dv, exampleStreamID, exampleTraceID), - newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), dv, exampleStreamID, exampleTraceID), - newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), dv, exampleStreamID, exampleTraceID), + newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID), + newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID), + newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID), } }(), sendResults: []error{ @@ -166,11 +166,11 @@ func TestSendOptimizer(t *testing.T) { description: "multiplex single no errors", optimizer: &multiplexOptimizer{}, reqs: func() []*pendingWrite { - dv := newDescriptorVersion(exampleDP) + tmpl := newVersionedTemplate().revise(reviseProtoSchema(exampleDP)) return []*pendingWrite{ - newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), dv, exampleStreamID, exampleTraceID), - newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), dv, exampleStreamID, exampleTraceID), - newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), dv, exampleStreamID, exampleTraceID), + newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID), + newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID), + newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID), } }(), sendResults: []error{ @@ -193,8 +193,8 @@ func TestSendOptimizer(t *testing.T) { description: "multiplex interleave", optimizer: &multiplexOptimizer{}, reqs: func() []*pendingWrite { - dvA := newDescriptorVersion(exampleDP) - dvB := newDescriptorVersion(protodesc.ToDescriptorProto((&testdata.AllSupportedTypes{}).ProtoReflect().Descriptor())) + tmplA := newVersionedTemplate().revise(reviseProtoSchema(exampleDP)) + tmplB := newVersionedTemplate().revise(reviseProtoSchema(protodesc.ToDescriptorProto((&testdata.AllSupportedTypes{}).ProtoReflect().Descriptor()))) reqA := proto.Clone(exampleReq).(*storagepb.AppendRowsRequest) reqA.WriteStream = "alpha" @@ -203,16 +203,16 @@ func TestSendOptimizer(t *testing.T) { reqB.WriteStream = "beta" writes := make([]*pendingWrite, 10) - writes[0] = newPendingWrite(ctx, nil, reqA, dvA, reqA.GetWriteStream(), exampleTraceID) - writes[1] = newPendingWrite(ctx, nil, reqA, dvA, reqA.GetWriteStream(), exampleTraceID) - writes[2] = newPendingWrite(ctx, nil, reqB, dvB, reqB.GetWriteStream(), exampleTraceID) - writes[3] = newPendingWrite(ctx, nil, reqA, dvA, reqA.GetWriteStream(), exampleTraceID) - writes[4] = newPendingWrite(ctx, nil, reqB, dvB, reqB.GetWriteStream(), exampleTraceID) - writes[5] = newPendingWrite(ctx, nil, reqB, dvB, reqB.GetWriteStream(), exampleTraceID) - writes[6] = newPendingWrite(ctx, nil, reqB, dvB, reqB.GetWriteStream(), exampleTraceID) - writes[7] = newPendingWrite(ctx, nil, reqB, dvB, reqB.GetWriteStream(), exampleTraceID) - writes[8] = newPendingWrite(ctx, nil, reqA, dvA, reqA.GetWriteStream(), exampleTraceID) - writes[9] = newPendingWrite(ctx, nil, reqA, dvA, reqA.GetWriteStream(), exampleTraceID) + writes[0] = newPendingWrite(ctx, nil, reqA, tmplA, reqA.GetWriteStream(), exampleTraceID) + writes[1] = newPendingWrite(ctx, nil, reqA, tmplA, reqA.GetWriteStream(), exampleTraceID) + writes[2] = newPendingWrite(ctx, nil, reqB, tmplB, reqB.GetWriteStream(), exampleTraceID) + writes[3] = newPendingWrite(ctx, nil, reqA, tmplA, reqA.GetWriteStream(), exampleTraceID) + writes[4] = newPendingWrite(ctx, nil, reqB, tmplB, reqB.GetWriteStream(), exampleTraceID) + writes[5] = newPendingWrite(ctx, nil, reqB, tmplB, reqB.GetWriteStream(), exampleTraceID) + writes[6] = newPendingWrite(ctx, nil, reqB, tmplB, reqB.GetWriteStream(), exampleTraceID) + writes[7] = newPendingWrite(ctx, nil, reqB, tmplB, reqB.GetWriteStream(), exampleTraceID) + writes[8] = newPendingWrite(ctx, nil, reqA, tmplA, reqA.GetWriteStream(), exampleTraceID) + writes[9] = newPendingWrite(ctx, nil, reqA, tmplA, reqA.GetWriteStream(), exampleTraceID) return writes }(), @@ -270,16 +270,16 @@ func TestSendOptimizer(t *testing.T) { description: "multiplex w/evolution", optimizer: &multiplexOptimizer{}, reqs: func() []*pendingWrite { - dvOld := newDescriptorVersion(exampleDP) - dvNew := newDescriptorVersion(&descriptorpb.DescriptorProto{Name: proto.String("new")}) + tmplOld := newVersionedTemplate().revise(reviseProtoSchema(exampleDP)) + tmplNew := tmplOld.revise(reviseProtoSchema(&descriptorpb.DescriptorProto{Name: proto.String("new")})) example := proto.Clone(exampleReq).(*storagepb.AppendRowsRequest) writes := make([]*pendingWrite, 4) - writes[0] = newPendingWrite(ctx, nil, example, dvOld, exampleStreamID, exampleTraceID) - writes[1] = newPendingWrite(ctx, nil, example, dvOld, exampleStreamID, exampleTraceID) - writes[2] = newPendingWrite(ctx, nil, example, dvNew, exampleStreamID, exampleTraceID) - writes[3] = newPendingWrite(ctx, nil, example, dvNew, exampleStreamID, exampleTraceID) + writes[0] = newPendingWrite(ctx, nil, example, tmplOld, exampleStreamID, exampleTraceID) + writes[1] = newPendingWrite(ctx, nil, example, tmplOld, exampleStreamID, exampleTraceID) + writes[2] = newPendingWrite(ctx, nil, example, tmplNew, exampleStreamID, exampleTraceID) + writes[3] = newPendingWrite(ctx, nil, example, tmplNew, exampleStreamID, exampleTraceID) return writes }(), @@ -336,59 +336,63 @@ func TestSendOptimizer(t *testing.T) { } } -func TestDescriptorVersion_EqVersion(t *testing.T) { - - exampleDV := newDescriptorVersion(&descriptorpb.DescriptorProto{Name: proto.String("foo")}) - copiedExampleDV := &descriptorVersion{versionTime: exampleDV.versionTime, descriptorProto: &descriptorpb.DescriptorProto{Name: proto.String("foo")}, hashVal: exampleDV.hashVal} - exampleDV2 := &descriptorVersion{versionTime: exampleDV.versionTime, descriptorProto: &descriptorpb.DescriptorProto{Name: proto.String("bar")}, hashVal: exampleDV.hashVal} - +func TestVersionedTemplate(t *testing.T) { testCases := []struct { - desc string - current *descriptorVersion - other *descriptorVersion - want bool + desc string + inputTmpl *storagepb.AppendRowsRequest + changes []templateRevisionF + wantCompatible bool }{ { - desc: "both nil", - }, - { - desc: "nil current", - other: newDescriptorVersion(&descriptorpb.DescriptorProto{}), - want: false, + desc: "nil template", + wantCompatible: true, }, { - desc: "nil other", - current: newDescriptorVersion(&descriptorpb.DescriptorProto{}), - want: false, + desc: "no changes", + inputTmpl: &storagepb.AppendRowsRequest{}, + wantCompatible: true, }, { - desc: "mismatched", - current: newDescriptorVersion(&descriptorpb.DescriptorProto{}), - other: newDescriptorVersion(&descriptorpb.DescriptorProto{}), - want: false, - }, - { - desc: "equal, same reference", - current: exampleDV, - other: exampleDV, - want: true, + desc: "empty schema", + inputTmpl: &storagepb.AppendRowsRequest{}, + changes: []templateRevisionF{ + reviseProtoSchema(nil), + }, + wantCompatible: false, }, { - desc: "equal, different references", - current: exampleDV, - other: copiedExampleDV, - want: true, + desc: "same default mvi", + inputTmpl: &storagepb.AppendRowsRequest{ + DefaultMissingValueInterpretation: storagepb.AppendRowsRequest_NULL_VALUE, + }, + changes: []templateRevisionF{ + reviseDefaultMissingValueInterpretation(storagepb.AppendRowsRequest_NULL_VALUE), + }, + wantCompatible: true, }, { - desc: "almost equal aka a collision", - current: exampleDV, - other: exampleDV2, - want: true, + desc: "differing default mvi", + inputTmpl: &storagepb.AppendRowsRequest{ + DefaultMissingValueInterpretation: storagepb.AppendRowsRequest_NULL_VALUE, + }, + changes: []templateRevisionF{ + reviseDefaultMissingValueInterpretation(storagepb.AppendRowsRequest_DEFAULT_VALUE), + }, + wantCompatible: false, }, } + for _, tc := range testCases { - if got := tc.current.eqVersion(tc.other); got != tc.want { - t.Errorf("case %q, got %t want %t", tc.desc, got, tc.want) + orig := newVersionedTemplate() + orig.tmpl = tc.inputTmpl + orig.computeHash() + + rev := orig.revise(tc.changes...) + if orig.Compatible(rev) != rev.Compatible(orig) { + t.Errorf("case %q: inconsistent compatibility, orig %t rev %t", tc.desc, orig.Compatible(rev), rev.Compatible(orig)) + } + if got := orig.Compatible(rev); tc.wantCompatible != got { + t.Errorf("case %q: Compatible mismatch, got %t want %t", tc.desc, got, tc.wantCompatible) } } } diff --git a/bigquery/storage/managedwriter/testdata/schemas.go b/bigquery/storage/managedwriter/testdata/schemas.go index d1d73dd2557f..8d949a288196 100644 --- a/bigquery/storage/managedwriter/testdata/schemas.go +++ b/bigquery/storage/managedwriter/testdata/schemas.go @@ -296,4 +296,38 @@ var ( Type: bigquery.IntegerFieldType, }, } + + DefaultValueSchema bigquery.Schema = bigquery.Schema{ + { + Name: "id", + Type: bigquery.StringFieldType, + }, + { + Name: "strcol", + Type: bigquery.StringFieldType, + }, + { + Name: "strcol_withdef", + Type: bigquery.StringFieldType, + DefaultValueExpression: "\"defaultvalue\"", + }, + { + Name: "intcol", + Type: bigquery.IntegerFieldType, + }, + { + Name: "intcol_withdef", + Type: bigquery.IntegerFieldType, + DefaultValueExpression: "-99", + }, + { + Name: "otherstr", + Type: bigquery.StringFieldType, + }, + { + Name: "otherstr_withdef", + Type: bigquery.StringFieldType, + DefaultValueExpression: "\"otherval\"", + }, + } ) diff --git a/bigquery/storage/managedwriter/testdata/testing.pb.go b/bigquery/storage/managedwriter/testdata/testing.pb.go index 75ae418b7d4c..992da63955b3 100644 --- a/bigquery/storage/managedwriter/testdata/testing.pb.go +++ b/bigquery/storage/managedwriter/testdata/testing.pb.go @@ -14,7 +14,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.27.1 +// protoc-gen-go v1.28.0 // protoc v3.17.3 // source: testing.proto @@ -660,6 +660,180 @@ func (*WithOneOf_StringValue) isWithOneOf_OneofValue() {} func (*WithOneOf_DoubleValue) isWithOneOf_OneofValue() {} +type DefaultValues struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Id *string `protobuf:"bytes,1,opt,name=id" json:"id,omitempty"` + Strcol *string `protobuf:"bytes,2,opt,name=strcol" json:"strcol,omitempty"` + StrcolWithdef *string `protobuf:"bytes,3,opt,name=strcol_withdef,json=strcolWithdef" json:"strcol_withdef,omitempty"` + Intcol *int64 `protobuf:"varint,4,opt,name=intcol" json:"intcol,omitempty"` + IntcolWithdef *int64 `protobuf:"varint,5,opt,name=intcol_withdef,json=intcolWithdef" json:"intcol_withdef,omitempty"` + Otherstr *string `protobuf:"bytes,6,opt,name=otherstr" json:"otherstr,omitempty"` + OtherstrDefault *string `protobuf:"bytes,7,opt,name=otherstr_default,json=otherstrDefault" json:"otherstr_default,omitempty"` +} + +func (x *DefaultValues) Reset() { + *x = DefaultValues{} + if protoimpl.UnsafeEnabled { + mi := &file_testing_proto_msgTypes[9] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *DefaultValues) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*DefaultValues) ProtoMessage() {} + +func (x *DefaultValues) ProtoReflect() protoreflect.Message { + mi := &file_testing_proto_msgTypes[9] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use DefaultValues.ProtoReflect.Descriptor instead. +func (*DefaultValues) Descriptor() ([]byte, []int) { + return file_testing_proto_rawDescGZIP(), []int{9} +} + +func (x *DefaultValues) GetId() string { + if x != nil && x.Id != nil { + return *x.Id + } + return "" +} + +func (x *DefaultValues) GetStrcol() string { + if x != nil && x.Strcol != nil { + return *x.Strcol + } + return "" +} + +func (x *DefaultValues) GetStrcolWithdef() string { + if x != nil && x.StrcolWithdef != nil { + return *x.StrcolWithdef + } + return "" +} + +func (x *DefaultValues) GetIntcol() int64 { + if x != nil && x.Intcol != nil { + return *x.Intcol + } + return 0 +} + +func (x *DefaultValues) GetIntcolWithdef() int64 { + if x != nil && x.IntcolWithdef != nil { + return *x.IntcolWithdef + } + return 0 +} + +func (x *DefaultValues) GetOtherstr() string { + if x != nil && x.Otherstr != nil { + return *x.Otherstr + } + return "" +} + +func (x *DefaultValues) GetOtherstrDefault() string { + if x != nil && x.OtherstrDefault != nil { + return *x.OtherstrDefault + } + return "" +} + +type DefaultValuesPartialSchema struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Id *string `protobuf:"bytes,1,opt,name=id" json:"id,omitempty"` + Strcol *string `protobuf:"bytes,2,opt,name=strcol" json:"strcol,omitempty"` + StrcolWithdef *string `protobuf:"bytes,3,opt,name=strcol_withdef,json=strcolWithdef" json:"strcol_withdef,omitempty"` + Intcol *int64 `protobuf:"varint,4,opt,name=intcol" json:"intcol,omitempty"` + IntcolWithdef *int64 `protobuf:"varint,5,opt,name=intcol_withdef,json=intcolWithdef" json:"intcol_withdef,omitempty"` +} + +func (x *DefaultValuesPartialSchema) Reset() { + *x = DefaultValuesPartialSchema{} + if protoimpl.UnsafeEnabled { + mi := &file_testing_proto_msgTypes[10] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *DefaultValuesPartialSchema) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*DefaultValuesPartialSchema) ProtoMessage() {} + +func (x *DefaultValuesPartialSchema) ProtoReflect() protoreflect.Message { + mi := &file_testing_proto_msgTypes[10] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use DefaultValuesPartialSchema.ProtoReflect.Descriptor instead. +func (*DefaultValuesPartialSchema) Descriptor() ([]byte, []int) { + return file_testing_proto_rawDescGZIP(), []int{10} +} + +func (x *DefaultValuesPartialSchema) GetId() string { + if x != nil && x.Id != nil { + return *x.Id + } + return "" +} + +func (x *DefaultValuesPartialSchema) GetStrcol() string { + if x != nil && x.Strcol != nil { + return *x.Strcol + } + return "" +} + +func (x *DefaultValuesPartialSchema) GetStrcolWithdef() string { + if x != nil && x.StrcolWithdef != nil { + return *x.StrcolWithdef + } + return "" +} + +func (x *DefaultValuesPartialSchema) GetIntcol() int64 { + if x != nil && x.Intcol != nil { + return *x.Intcol + } + return 0 +} + +func (x *DefaultValuesPartialSchema) GetIntcolWithdef() int64 { + if x != nil && x.IntcolWithdef != nil { + return *x.IntcolWithdef + } + return 0 +} + var File_testing_proto protoreflect.FileDescriptor var file_testing_proto_rawDesc = []byte{ @@ -741,14 +915,39 @@ var file_testing_proto_rawDesc = []byte{ 0x69, 0x6e, 0x67, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x23, 0x0a, 0x0c, 0x64, 0x6f, 0x75, 0x62, 0x6c, 0x65, 0x5f, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x01, 0x48, 0x00, 0x52, 0x0b, 0x64, 0x6f, 0x75, 0x62, 0x6c, 0x65, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x42, 0x0d, 0x0a, - 0x0b, 0x6f, 0x6e, 0x65, 0x6f, 0x66, 0x5f, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x2a, 0x28, 0x0a, 0x08, - 0x54, 0x65, 0x73, 0x74, 0x45, 0x6e, 0x75, 0x6d, 0x12, 0x0d, 0x0a, 0x09, 0x54, 0x65, 0x73, 0x74, - 0x45, 0x6e, 0x75, 0x6d, 0x30, 0x10, 0x00, 0x12, 0x0d, 0x0a, 0x09, 0x54, 0x65, 0x73, 0x74, 0x45, - 0x6e, 0x75, 0x6d, 0x31, 0x10, 0x01, 0x42, 0x3d, 0x5a, 0x3b, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2e, - 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x6f, 0x2f, 0x62, 0x69, - 0x67, 0x71, 0x75, 0x65, 0x72, 0x79, 0x2f, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x2f, 0x6d, - 0x61, 0x6e, 0x61, 0x67, 0x65, 0x64, 0x77, 0x72, 0x69, 0x74, 0x65, 0x72, 0x2f, 0x74, 0x65, 0x73, - 0x74, 0x64, 0x61, 0x74, 0x61, + 0x0b, 0x6f, 0x6e, 0x65, 0x6f, 0x66, 0x5f, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x22, 0xe4, 0x01, 0x0a, + 0x0d, 0x44, 0x65, 0x66, 0x61, 0x75, 0x6c, 0x74, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x73, 0x12, 0x0e, + 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x16, + 0x0a, 0x06, 0x73, 0x74, 0x72, 0x63, 0x6f, 0x6c, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, + 0x73, 0x74, 0x72, 0x63, 0x6f, 0x6c, 0x12, 0x25, 0x0a, 0x0e, 0x73, 0x74, 0x72, 0x63, 0x6f, 0x6c, + 0x5f, 0x77, 0x69, 0x74, 0x68, 0x64, 0x65, 0x66, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, + 0x73, 0x74, 0x72, 0x63, 0x6f, 0x6c, 0x57, 0x69, 0x74, 0x68, 0x64, 0x65, 0x66, 0x12, 0x16, 0x0a, + 0x06, 0x69, 0x6e, 0x74, 0x63, 0x6f, 0x6c, 0x18, 0x04, 0x20, 0x01, 0x28, 0x03, 0x52, 0x06, 0x69, + 0x6e, 0x74, 0x63, 0x6f, 0x6c, 0x12, 0x25, 0x0a, 0x0e, 0x69, 0x6e, 0x74, 0x63, 0x6f, 0x6c, 0x5f, + 0x77, 0x69, 0x74, 0x68, 0x64, 0x65, 0x66, 0x18, 0x05, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0d, 0x69, + 0x6e, 0x74, 0x63, 0x6f, 0x6c, 0x57, 0x69, 0x74, 0x68, 0x64, 0x65, 0x66, 0x12, 0x1a, 0x0a, 0x08, + 0x6f, 0x74, 0x68, 0x65, 0x72, 0x73, 0x74, 0x72, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, + 0x6f, 0x74, 0x68, 0x65, 0x72, 0x73, 0x74, 0x72, 0x12, 0x29, 0x0a, 0x10, 0x6f, 0x74, 0x68, 0x65, + 0x72, 0x73, 0x74, 0x72, 0x5f, 0x64, 0x65, 0x66, 0x61, 0x75, 0x6c, 0x74, 0x18, 0x07, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x0f, 0x6f, 0x74, 0x68, 0x65, 0x72, 0x73, 0x74, 0x72, 0x44, 0x65, 0x66, 0x61, + 0x75, 0x6c, 0x74, 0x22, 0xaa, 0x01, 0x0a, 0x1a, 0x44, 0x65, 0x66, 0x61, 0x75, 0x6c, 0x74, 0x56, + 0x61, 0x6c, 0x75, 0x65, 0x73, 0x50, 0x61, 0x72, 0x74, 0x69, 0x61, 0x6c, 0x53, 0x63, 0x68, 0x65, + 0x6d, 0x61, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, + 0x69, 0x64, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x72, 0x63, 0x6f, 0x6c, 0x18, 0x02, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x06, 0x73, 0x74, 0x72, 0x63, 0x6f, 0x6c, 0x12, 0x25, 0x0a, 0x0e, 0x73, 0x74, + 0x72, 0x63, 0x6f, 0x6c, 0x5f, 0x77, 0x69, 0x74, 0x68, 0x64, 0x65, 0x66, 0x18, 0x03, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x0d, 0x73, 0x74, 0x72, 0x63, 0x6f, 0x6c, 0x57, 0x69, 0x74, 0x68, 0x64, 0x65, + 0x66, 0x12, 0x16, 0x0a, 0x06, 0x69, 0x6e, 0x74, 0x63, 0x6f, 0x6c, 0x18, 0x04, 0x20, 0x01, 0x28, + 0x03, 0x52, 0x06, 0x69, 0x6e, 0x74, 0x63, 0x6f, 0x6c, 0x12, 0x25, 0x0a, 0x0e, 0x69, 0x6e, 0x74, + 0x63, 0x6f, 0x6c, 0x5f, 0x77, 0x69, 0x74, 0x68, 0x64, 0x65, 0x66, 0x18, 0x05, 0x20, 0x01, 0x28, + 0x03, 0x52, 0x0d, 0x69, 0x6e, 0x74, 0x63, 0x6f, 0x6c, 0x57, 0x69, 0x74, 0x68, 0x64, 0x65, 0x66, + 0x2a, 0x28, 0x0a, 0x08, 0x54, 0x65, 0x73, 0x74, 0x45, 0x6e, 0x75, 0x6d, 0x12, 0x0d, 0x0a, 0x09, + 0x54, 0x65, 0x73, 0x74, 0x45, 0x6e, 0x75, 0x6d, 0x30, 0x10, 0x00, 0x12, 0x0d, 0x0a, 0x09, 0x54, + 0x65, 0x73, 0x74, 0x45, 0x6e, 0x75, 0x6d, 0x31, 0x10, 0x01, 0x42, 0x3d, 0x5a, 0x3b, 0x63, 0x6c, + 0x6f, 0x75, 0x64, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, + 0x6f, 0x2f, 0x62, 0x69, 0x67, 0x71, 0x75, 0x65, 0x72, 0x79, 0x2f, 0x73, 0x74, 0x6f, 0x72, 0x61, + 0x67, 0x65, 0x2f, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x64, 0x77, 0x72, 0x69, 0x74, 0x65, 0x72, + 0x2f, 0x74, 0x65, 0x73, 0x74, 0x64, 0x61, 0x74, 0x61, } var ( @@ -764,25 +963,27 @@ func file_testing_proto_rawDescGZIP() []byte { } var file_testing_proto_enumTypes = make([]protoimpl.EnumInfo, 1) -var file_testing_proto_msgTypes = make([]protoimpl.MessageInfo, 9) +var file_testing_proto_msgTypes = make([]protoimpl.MessageInfo, 11) var file_testing_proto_goTypes = []interface{}{ - (TestEnum)(0), // 0: testdata.TestEnum - (*AllSupportedTypes)(nil), // 1: testdata.AllSupportedTypes - (*WithWellKnownTypes)(nil), // 2: testdata.WithWellKnownTypes - (*InnerType)(nil), // 3: testdata.InnerType - (*NestedType)(nil), // 4: testdata.NestedType - (*ComplexType)(nil), // 5: testdata.ComplexType - (*ContainsRecursive)(nil), // 6: testdata.ContainsRecursive - (*RecursiveType)(nil), // 7: testdata.RecursiveType - (*RecursiveTypeTopMessage)(nil), // 8: testdata.RecursiveTypeTopMessage - (*WithOneOf)(nil), // 9: testdata.WithOneOf - (*wrapperspb.Int64Value)(nil), // 10: google.protobuf.Int64Value - (*wrapperspb.StringValue)(nil), // 11: google.protobuf.StringValue + (TestEnum)(0), // 0: testdata.TestEnum + (*AllSupportedTypes)(nil), // 1: testdata.AllSupportedTypes + (*WithWellKnownTypes)(nil), // 2: testdata.WithWellKnownTypes + (*InnerType)(nil), // 3: testdata.InnerType + (*NestedType)(nil), // 4: testdata.NestedType + (*ComplexType)(nil), // 5: testdata.ComplexType + (*ContainsRecursive)(nil), // 6: testdata.ContainsRecursive + (*RecursiveType)(nil), // 7: testdata.RecursiveType + (*RecursiveTypeTopMessage)(nil), // 8: testdata.RecursiveTypeTopMessage + (*WithOneOf)(nil), // 9: testdata.WithOneOf + (*DefaultValues)(nil), // 10: testdata.DefaultValues + (*DefaultValuesPartialSchema)(nil), // 11: testdata.DefaultValuesPartialSchema + (*wrapperspb.Int64Value)(nil), // 12: google.protobuf.Int64Value + (*wrapperspb.StringValue)(nil), // 13: google.protobuf.StringValue } var file_testing_proto_depIdxs = []int32{ 0, // 0: testdata.AllSupportedTypes.enum_value:type_name -> testdata.TestEnum - 10, // 1: testdata.WithWellKnownTypes.wrapped_int64:type_name -> google.protobuf.Int64Value - 11, // 2: testdata.WithWellKnownTypes.wrapped_string:type_name -> google.protobuf.StringValue + 12, // 1: testdata.WithWellKnownTypes.wrapped_int64:type_name -> google.protobuf.Int64Value + 13, // 2: testdata.WithWellKnownTypes.wrapped_string:type_name -> google.protobuf.StringValue 3, // 3: testdata.NestedType.inner_type:type_name -> testdata.InnerType 4, // 4: testdata.ComplexType.nested_repeated_type:type_name -> testdata.NestedType 3, // 5: testdata.ComplexType.inner_type:type_name -> testdata.InnerType @@ -910,6 +1111,30 @@ func file_testing_proto_init() { return nil } } + file_testing_proto_msgTypes[9].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*DefaultValues); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_testing_proto_msgTypes[10].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*DefaultValuesPartialSchema); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } file_testing_proto_msgTypes[8].OneofWrappers = []interface{}{ (*WithOneOf_StringValue)(nil), @@ -921,7 +1146,7 @@ func file_testing_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_testing_proto_rawDesc, NumEnums: 1, - NumMessages: 9, + NumMessages: 11, NumExtensions: 0, NumServices: 0, }, diff --git a/bigquery/storage/managedwriter/testdata/testing.proto b/bigquery/storage/managedwriter/testdata/testing.proto index 36f676521309..79055b0a64f6 100644 --- a/bigquery/storage/managedwriter/testdata/testing.proto +++ b/bigquery/storage/managedwriter/testdata/testing.proto @@ -73,4 +73,22 @@ message WithOneOf { string string_value = 2; double double_value = 3; } +} + +message DefaultValues { + optional string id = 1; + optional string strcol = 2; + optional string strcol_withdef = 3; + optional int64 intcol = 4; + optional int64 intcol_withdef = 5; + optional string otherstr = 6; + optional string otherstr_default = 7; +} + +message DefaultValuesPartialSchema { + optional string id = 1; + optional string strcol = 2; + optional string strcol_withdef = 3; + optional int64 intcol = 4; + optional int64 intcol_withdef = 5; } \ No newline at end of file