From 4bc8fd6c11bbb4f1bf19182f602e1fd3db3cbd85 Mon Sep 17 00:00:00 2001 From: Seth Hollyman Date: Wed, 11 Oct 2023 03:18:23 +0000 Subject: [PATCH 1/9] feat(bigquery/storage/managedwriter): refactor to add versionedTemplate This feature introduces a new abstraction, the versionedTemplate. The intent is for this to replace the existing schema versioning mechanism with something more general and robust. Once the swap is complete, we can support default value changes and schema changes through the same templating mechanism. --- .../storage/managedwriter/send_optimizer.go | 93 +++++++++++++++++++ .../managedwriter/send_optimizer_test.go | 61 ++++++++++++ 2 files changed, 154 insertions(+) diff --git a/bigquery/storage/managedwriter/send_optimizer.go b/bigquery/storage/managedwriter/send_optimizer.go index 008fae50ee94..a5ae1f17a1a9 100644 --- a/bigquery/storage/managedwriter/send_optimizer.go +++ b/bigquery/storage/managedwriter/send_optimizer.go @@ -15,6 +15,8 @@ package managedwriter import ( + "bytes" + "encoding/binary" "hash/crc32" "time" @@ -226,3 +228,94 @@ func (dv *descriptorVersion) eqVersion(other *descriptorVersion) bool { func (dv *descriptorVersion) isNewer(other *descriptorVersion) bool { return dv.versionTime.UnixNano() > other.versionTime.UnixNano() } + +// versionedTemplate is used for faster comparison of the templated part of +// an AppendRowsRequest, which bears settings-like fields related to schema +// and default value configuration. Direct proto comparison through something +// like proto.Equal is far too expensive, so versionTemplate leverages a faster +// hash-based comparison to avoid the deep equality checks. +type versionedTemplate struct { + versionTime time.Time + hashVal uint32 + tmpl *storagepb.AppendRowsRequest +} + +func newVersionedTemplate() *versionedTemplate { + vt := &versionedTemplate{ + versionTime: time.Now(), + tmpl: &storagepb.AppendRowsRequest{}, + } + vt.computeHash() + return vt +} + +// computeHash is an internal utility function for calculating the hash value +// for faster comparison. +func (vt *versionedTemplate) computeHash() { + buf := new(bytes.Buffer) + if b, err := proto.Marshal(vt.tmpl); err == nil { + buf.Write(b) + } else { + // if we fail to serialize the proto (unlikely), consume the timestamp for input instead. + binary.Write(buf, binary.LittleEndian, vt.versionTime.UnixNano()) + } + vt.hashVal = crc32.ChecksumIEEE(buf.Bytes()) +} + +type templateRevisionF func(m *storagepb.AppendRowsRequest) + +// revise makes a new versionedTemplate from the existing template, applying any changes +func (vt *versionedTemplate) revise(changes ...templateRevisionF) *versionedTemplate { + if len(changes) == 0 { + // if there's no changes, simply return the base revision + return vt + } + out := &versionedTemplate{ + versionTime: time.Now(), + tmpl: proto.Clone(vt.tmpl).(*storagepb.AppendRowsRequest), + } + for _, r := range changes { + r(out.tmpl) + } + out.computeHash() + return out +} + +// Compatible is effectively a fast equality check, that relies on the hash value +// and avoids the potentially very costly deep comparison of the proto message templates. +func (vt *versionedTemplate) Compatible(other *versionedTemplate) bool { + if other == nil { + return vt == nil + } + return vt.hashVal == other.hashVal +} + +func reviseProtoSchema(newSchema *descriptorpb.DescriptorProto) templateRevisionF { + return func(m *storagepb.AppendRowsRequest) { + if m != nil { + m.Rows = &storagepb.AppendRowsRequest_ProtoRows{ + ProtoRows: &storagepb.AppendRowsRequest_ProtoData{ + WriterSchema: &storagepb.ProtoSchema{ + ProtoDescriptor: proto.Clone(newSchema).(*descriptorpb.DescriptorProto), + }, + }, + } + } + } +} + +func reviseMissingValueInterpretations(vi map[string]storagepb.AppendRowsRequest_MissingValueInterpretation) templateRevisionF { + return func(m *storagepb.AppendRowsRequest) { + if m != nil { + m.MissingValueInterpretations = vi + } + } +} + +func reviseDefaultMissingValueInterpretation(def storagepb.AppendRowsRequest_MissingValueInterpretation) templateRevisionF { + return func(m *storagepb.AppendRowsRequest) { + if m != nil { + m.DefaultMissingValueInterpretation = def + } + } +} diff --git a/bigquery/storage/managedwriter/send_optimizer_test.go b/bigquery/storage/managedwriter/send_optimizer_test.go index 20253956b987..5b0848b61a0a 100644 --- a/bigquery/storage/managedwriter/send_optimizer_test.go +++ b/bigquery/storage/managedwriter/send_optimizer_test.go @@ -392,3 +392,64 @@ func TestDescriptorVersion_EqVersion(t *testing.T) { } } } + +func TestVersionedTemplate(t *testing.T) { + testCases := []struct { + desc string + inputTmpl *storagepb.AppendRowsRequest + changes []templateRevisionF + wantCompatible bool + }{ + { + desc: "nil template", + wantCompatible: true, + }, + { + desc: "no changes", + inputTmpl: &storagepb.AppendRowsRequest{}, + wantCompatible: true, + }, + { + desc: "empty schema", + inputTmpl: &storagepb.AppendRowsRequest{}, + changes: []templateRevisionF{ + reviseProtoSchema(nil), + }, + wantCompatible: false, + }, + { + desc: "same default mvi", + inputTmpl: &storagepb.AppendRowsRequest{ + DefaultMissingValueInterpretation: storagepb.AppendRowsRequest_NULL_VALUE, + }, + changes: []templateRevisionF{ + reviseDefaultMissingValueInterpretation(storagepb.AppendRowsRequest_NULL_VALUE), + }, + wantCompatible: true, + }, + { + desc: "differing default mvi", + inputTmpl: &storagepb.AppendRowsRequest{ + DefaultMissingValueInterpretation: storagepb.AppendRowsRequest_NULL_VALUE, + }, + changes: []templateRevisionF{ + reviseDefaultMissingValueInterpretation(storagepb.AppendRowsRequest_DEFAULT_VALUE), + }, + wantCompatible: false, + }, + } + + for _, tc := range testCases { + orig := newVersionedTemplate() + orig.tmpl = tc.inputTmpl + orig.computeHash() + + rev := orig.revise(tc.changes...) + if orig.Compatible(rev) != rev.Compatible(orig) { + t.Errorf("case %q: inconsistent compatibility, orig %t rev %t", tc.desc, orig.Compatible(rev), rev.Compatible(orig)) + } + if got := orig.Compatible(rev); tc.wantCompatible != got { + t.Errorf("case %q: Compatible mismatch, got %t want %t", tc.desc, got, tc.wantCompatible) + } + } +} From 3e86322544f5a15717dab7d6d714f737364985d9 Mon Sep 17 00:00:00 2001 From: Seth Hollyman Date: Fri, 13 Oct 2023 01:28:19 +0000 Subject: [PATCH 2/9] feat(bigquery/storage/managedwriter): support default value controls In terms of public surface, this PR adds new options to control how missing values are interpreted when writing. For ManagedStream instantiation, the options are: * WithDefaultMissingValueInterpretation (blanket setting for all columns) * WithMissingValueInterpretations (per-column settings) To support updates, these are added as AppendOptions: * UpdateDefaultMissingValueInterpretation * UpdateMissingValueInterpretations Implementation-wise, this PR rips out the previous schema-specific versioner and expands the concept to a versioned AppendRowsRequest template. This more general mechanism allows us to version all settings that manifest as request fields in the AppendRowsRequest. --- .../storage/managedwriter/appendresult.go | 33 ++--- .../managedwriter/appendresult_test.go | 15 +- bigquery/storage/managedwriter/client.go | 1 + bigquery/storage/managedwriter/connection.go | 18 ++- .../storage/managedwriter/managed_stream.go | 10 +- .../managedwriter/managed_stream_test.go | 18 +-- bigquery/storage/managedwriter/options.go | 78 +++++++++- .../storage/managedwriter/options_test.go | 92 ++++++++++++ .../storage/managedwriter/send_optimizer.go | 79 ++--------- .../managedwriter/send_optimizer_test.go | 133 +++++------------- 10 files changed, 264 insertions(+), 213 deletions(-) diff --git a/bigquery/storage/managedwriter/appendresult.go b/bigquery/storage/managedwriter/appendresult.go index 47959eb14b5e..46709e70bf60 100644 --- a/bigquery/storage/managedwriter/appendresult.go +++ b/bigquery/storage/managedwriter/appendresult.go @@ -166,7 +166,7 @@ type pendingWrite struct { // likely outcome when processing requests and it allows us to be efficient on send. // We retain the additional information to build the complete request in the related fields. req *storagepb.AppendRowsRequest - descVersion *descriptorVersion // schema at time of creation + reqTmpl *versionedTemplate // request template at time of creation traceID string writeStreamID string @@ -188,21 +188,21 @@ type pendingWrite struct { // to the pending results for later consumption. The provided context is // embedded in the pending write, as the write may be retried and we want // to respect the original context for expiry/cancellation etc. -func newPendingWrite(ctx context.Context, src *ManagedStream, req *storagepb.AppendRowsRequest, curDescVersion *descriptorVersion, writeStreamID, traceID string) *pendingWrite { +func newPendingWrite(ctx context.Context, src *ManagedStream, req *storagepb.AppendRowsRequest, reqTmpl *versionedTemplate, writeStreamID, traceID string) *pendingWrite { pw := &pendingWrite{ writer: src, result: newAppendResult(), reqCtx: ctx, - req: req, - descVersion: curDescVersion, + req: req, // minimal req, typically just row data + reqTmpl: reqTmpl, // remainder of templated request writeStreamID: writeStreamID, traceID: traceID, } // Compute the approx size for flow control purposes. pw.reqSize = proto.Size(pw.req) + len(writeStreamID) + len(traceID) - if pw.descVersion != nil { - pw.reqSize += proto.Size(pw.descVersion.descriptorProto) + if pw.reqTmpl != nil { + pw.reqSize += proto.Size(pw.reqTmpl.tmpl) } return pw } @@ -221,33 +221,22 @@ func (pw *pendingWrite) markDone(resp *storagepb.AppendRowsResponse, err error) close(pw.result.ready) // Cleanup references remaining on the write explicitly. pw.req = nil - pw.descVersion = nil + pw.reqTmpl = nil pw.writer = nil pw.reqCtx = nil } func (pw *pendingWrite) constructFullRequest(addTrace bool) *storagepb.AppendRowsRequest { req := &storagepb.AppendRowsRequest{} + if pw.reqTmpl != nil { + req = proto.Clone(pw.reqTmpl.tmpl).(*storagepb.AppendRowsRequest) + } if pw.req != nil { - req = proto.Clone(pw.req).(*storagepb.AppendRowsRequest) + proto.Merge(req, pw.req) } if addTrace { req.TraceId = buildTraceID(&streamSettings{TraceID: pw.traceID}) } req.WriteStream = pw.writeStreamID - if pw.descVersion != nil { - ps := &storagepb.ProtoSchema{ - ProtoDescriptor: pw.descVersion.descriptorProto, - } - if pr := req.GetProtoRows(); pr != nil { - pr.WriterSchema = ps - } else { - req.Rows = &storagepb.AppendRowsRequest_ProtoRows{ - ProtoRows: &storagepb.AppendRowsRequest_ProtoData{ - WriterSchema: ps, - }, - } - } - } return req } diff --git a/bigquery/storage/managedwriter/appendresult_test.go b/bigquery/storage/managedwriter/appendresult_test.go index 43a3804633de..ced180295ef7 100644 --- a/bigquery/storage/managedwriter/appendresult_test.go +++ b/bigquery/storage/managedwriter/appendresult_test.go @@ -132,7 +132,8 @@ func TestPendingWrite(t *testing.T) { func TestPendingWrite_ConstructFullRequest(t *testing.T) { testDP := &descriptorpb.DescriptorProto{Name: proto.String("foo")} - testDV := newDescriptorVersion(testDP) + testTmpl := newVersionedTemplate().revise(reviseProtoSchema(testDP)) + testEmptyTraceID := buildTraceID(&streamSettings{}) for _, tc := range []struct { @@ -144,7 +145,7 @@ func TestPendingWrite_ConstructFullRequest(t *testing.T) { { desc: "nil request", pw: &pendingWrite{ - descVersion: testDV, + reqTmpl: testTmpl, }, want: &storagepb.AppendRowsRequest{ Rows: &storagepb.AppendRowsRequest_ProtoRows{ @@ -159,8 +160,8 @@ func TestPendingWrite_ConstructFullRequest(t *testing.T) { { desc: "empty req w/trace", pw: &pendingWrite{ - req: &storagepb.AppendRowsRequest{}, - descVersion: testDV, + req: &storagepb.AppendRowsRequest{}, + reqTmpl: testTmpl, }, addTrace: true, want: &storagepb.AppendRowsRequest{ @@ -177,8 +178,8 @@ func TestPendingWrite_ConstructFullRequest(t *testing.T) { { desc: "basic req", pw: &pendingWrite{ - req: &storagepb.AppendRowsRequest{}, - descVersion: testDV, + req: &storagepb.AppendRowsRequest{}, + reqTmpl: testTmpl, }, want: &storagepb.AppendRowsRequest{ Rows: &storagepb.AppendRowsRequest_ProtoRows{ @@ -194,7 +195,7 @@ func TestPendingWrite_ConstructFullRequest(t *testing.T) { desc: "everything w/trace", pw: &pendingWrite{ req: &storagepb.AppendRowsRequest{}, - descVersion: testDV, + reqTmpl: testTmpl, traceID: "foo", writeStreamID: "streamid", }, diff --git a/bigquery/storage/managedwriter/client.go b/bigquery/storage/managedwriter/client.go index d75e711a0ef8..ec872ec13861 100644 --- a/bigquery/storage/managedwriter/client.go +++ b/bigquery/storage/managedwriter/client.go @@ -151,6 +151,7 @@ func (c *Client) buildManagedStream(ctx context.Context, streamFunc streamClient id: newUUID(writerIDPrefix), c: c, streamSettings: defaultStreamSettings(), + curTemplate: newVersionedTemplate(), } // apply writer options. for _, opt := range opts { diff --git a/bigquery/storage/managedwriter/connection.go b/bigquery/storage/managedwriter/connection.go index d41ecc6e053a..5c3d81f1c59b 100644 --- a/bigquery/storage/managedwriter/connection.go +++ b/bigquery/storage/managedwriter/connection.go @@ -376,8 +376,22 @@ func (co *connection) lockingAppend(pw *pendingWrite) error { // Additionally, we check multiplex status as schema changes for explicit streams // require reconnect, whereas multiplex does not. forceReconnect := false - if pw.writer != nil && pw.descVersion != nil && pw.descVersion.isNewer(pw.writer.curDescVersion) { - pw.writer.curDescVersion = pw.descVersion + promoted := false + if pw.writer != nil && pw.reqTmpl != nil { + if !pw.reqTmpl.Compatible(pw.writer.curTemplate) { + if pw.writer.curTemplate == nil { + // promote because there's no current template + pw.writer.curTemplate = pw.reqTmpl + promoted = true + } else { + if pw.writer.curTemplate.versionTime.Before(pw.reqTmpl.versionTime) { + pw.writer.curTemplate = pw.reqTmpl + promoted = true + } + } + } + } + if promoted { if co.optimizer == nil { forceReconnect = true } else { diff --git a/bigquery/storage/managedwriter/managed_stream.go b/bigquery/storage/managedwriter/managed_stream.go index faaf9a776a48..ee73e32502b9 100644 --- a/bigquery/storage/managedwriter/managed_stream.go +++ b/bigquery/storage/managedwriter/managed_stream.go @@ -78,9 +78,9 @@ type ManagedStream struct { streamSettings *streamSettings // retains the current descriptor for the stream. - curDescVersion *descriptorVersion - c *Client - retry *statelessRetryer + curTemplate *versionedTemplate + c *Client + retry *statelessRetryer // writer state mu sync.Mutex @@ -298,9 +298,9 @@ func (ms *ManagedStream) AppendRows(ctx context.Context, data [][]byte, opts ... return nil, err } // Ensure we build the request and pending write with a consistent schema version. - curSchemaVersion := ms.curDescVersion + curTemplate := ms.curTemplate req := ms.buildRequest(data) - pw := newPendingWrite(ctx, ms, req, curSchemaVersion, ms.streamSettings.streamID, ms.streamSettings.TraceID) + pw := newPendingWrite(ctx, ms, req, curTemplate, ms.streamSettings.streamID, ms.streamSettings.TraceID) // apply AppendOption opts for _, opt := range opts { opt(pw) diff --git a/bigquery/storage/managedwriter/managed_stream_test.go b/bigquery/storage/managedwriter/managed_stream_test.go index 9a69b2c24e12..6ec2ca584a7f 100644 --- a/bigquery/storage/managedwriter/managed_stream_test.go +++ b/bigquery/storage/managedwriter/managed_stream_test.go @@ -110,7 +110,7 @@ func TestManagedStream_RequestOptimization(t *testing.T) { } ms.streamSettings.streamID = "FOO" ms.streamSettings.TraceID = "TRACE" - ms.curDescVersion = newDescriptorVersion(&descriptorpb.DescriptorProto{}) + ms.curTemplate = newVersionedTemplate().revise(reviseProtoSchema(&descriptorpb.DescriptorProto{})) fakeData := [][]byte{ []byte("foo"), @@ -191,7 +191,7 @@ func TestManagedStream_FlowControllerFailure(t *testing.T) { router.conn.fc = newFlowController(1, 0) router.conn.fc.acquire(ctx, 0) - ms.curDescVersion = newDescriptorVersion(&descriptorpb.DescriptorProto{}) + ms.curTemplate = newVersionedTemplate().revise(reviseProtoSchema(&descriptorpb.DescriptorProto{})) fakeData := [][]byte{ []byte("foo"), @@ -236,7 +236,7 @@ func TestManagedStream_AppendWithDeadline(t *testing.T) { t.Errorf("addWriter: %v", err) } conn := router.conn - ms.curDescVersion = newDescriptorVersion(&descriptorpb.DescriptorProto{}) + ms.curTemplate = newVersionedTemplate().revise(reviseProtoSchema(&descriptorpb.DescriptorProto{})) fakeData := [][]byte{ []byte("foo"), @@ -293,7 +293,7 @@ func TestManagedStream_ContextExpiry(t *testing.T) { ctx: ctx, streamSettings: defaultStreamSettings(), } - ms.curDescVersion = newDescriptorVersion(&descriptorpb.DescriptorProto{}) + ms.curTemplate = newVersionedTemplate().revise(reviseProtoSchema(&descriptorpb.DescriptorProto{})) if err := pool.addWriter(ms); err != nil { t.Errorf("addWriter: %v", err) } @@ -316,7 +316,7 @@ func TestManagedStream_ContextExpiry(t *testing.T) { cancel() // First, append with an invalid context. - pw := newPendingWrite(cancelCtx, ms, fakeReq, ms.curDescVersion, "", "") + pw := newPendingWrite(cancelCtx, ms, fakeReq, ms.curTemplate, "", "") err := ms.appendWithRetry(pw) if err != context.Canceled { t.Errorf("expected cancelled context error, got: %v", err) @@ -457,7 +457,7 @@ func TestManagedStream_LeakingGoroutines(t *testing.T) { ctx: ctx, streamSettings: defaultStreamSettings(), } - ms.curDescVersion = newDescriptorVersion(&descriptorpb.DescriptorProto{}) + ms.curTemplate = newVersionedTemplate().revise(reviseProtoSchema(&descriptorpb.DescriptorProto{})) if err := pool.addWriter(ms); err != nil { t.Errorf("addWriter: %v", err) } @@ -509,7 +509,7 @@ func TestManagedStream_LeakingGoroutinesReconnect(t *testing.T) { retry: newStatelessRetryer(), } ms.retry.maxAttempts = 4 - ms.curDescVersion = newDescriptorVersion(&descriptorpb.DescriptorProto{}) + ms.curTemplate = newVersionedTemplate().revise(reviseProtoSchema(&descriptorpb.DescriptorProto{})) if err := pool.addWriter(ms); err != nil { t.Errorf("addWriter: %v", err) } @@ -575,7 +575,7 @@ func TestManagedWriter_CancellationDuringRetry(t *testing.T) { streamSettings: defaultStreamSettings(), retry: newStatelessRetryer(), } - ms.curDescVersion = newDescriptorVersion(&descriptorpb.DescriptorProto{}) + ms.curTemplate = newVersionedTemplate().revise(reviseProtoSchema(&descriptorpb.DescriptorProto{})) if err := pool.addWriter(ms); err != nil { t.Errorf("addWriter: %v", err) } @@ -624,7 +624,7 @@ func TestManagedStream_Closure(t *testing.T) { streamSettings: defaultStreamSettings(), } ms.ctx, ms.cancel = context.WithCancel(pool.ctx) - ms.curDescVersion = newDescriptorVersion(&descriptorpb.DescriptorProto{}) + ms.curTemplate = newVersionedTemplate().revise(reviseProtoSchema(&descriptorpb.DescriptorProto{})) if err := pool.addWriter(ms); err != nil { t.Errorf("addWriter A: %v", err) } diff --git a/bigquery/storage/managedwriter/options.go b/bigquery/storage/managedwriter/options.go index 47e296f68615..4181204bb1b9 100644 --- a/bigquery/storage/managedwriter/options.go +++ b/bigquery/storage/managedwriter/options.go @@ -15,6 +15,7 @@ package managedwriter import ( + "cloud.google.com/go/bigquery/storage/apiv1/storagepb" "github.com/googleapis/gax-go/v2" "google.golang.org/api/option" "google.golang.org/api/option/internaloption" @@ -234,7 +235,49 @@ func WithTraceID(traceID string) WriterOption { // AppendRows calls on the stream. func WithSchemaDescriptor(dp *descriptorpb.DescriptorProto) WriterOption { return func(ms *ManagedStream) { - ms.curDescVersion = newDescriptorVersion(dp) + ms.curTemplate = ms.curTemplate.revise(reviseProtoSchema(dp)) + } +} + +// WithMissingValueInterpretations controls over how missing values are interpreted +// for individual columns. +// +// You must provide a map to indicate how to interpret missing value for some fields. Missing +// values are fields present in user schema but missing in rows. The key is +// the field name. The value is the interpretation of missing values for the +// field. +// +// For example, a map {'foo': NULL_VALUE, 'bar': DEFAULT_VALUE} means all +// missing values in field foo are interpreted as NULL, all missing values in +// field bar are interpreted as the default value of field bar in table +// schema. +// +// If a field is not in this map and has missing values, the missing values +// in this field are interpreted as NULL. +// +// Currently, field name can only be top-level column name, can't be a struct +// field path like 'foo.bar'. +func WithMissingValueInterpretations(mvi map[string]storagepb.AppendRowsRequest_MissingValueInterpretation) WriterOption { + return func(ms *ManagedStream) { + ms.curTemplate = ms.curTemplate.revise(reviseMissingValueInterpretations(mvi)) + } +} + +// WithDefaultMissingValueInterpretation controls how missing values are interpreted by +// for a given stream. See WithMissingValueIntepretations for more information about +// missing values. +// + +// WithMissingValueIntepretations set for individual colums can override the default chosen +// with this option. +// +// For example, if you want to write +// `NULL` instead of using default values for some columns, you can set +// `default_missing_value_interpretation` to `DEFAULT_VALUE` and at the same +// time, set `missing_value_interpretations` to `NULL_VALUE` on those columns. +func WithDefaultMissingValueInterpretation(def storagepb.AppendRowsRequest_MissingValueInterpretation) WriterOption { + return func(ms *ManagedStream) { + ms.curTemplate = ms.curTemplate.revise(reviseDefaultMissingValueInterpretation(def)) } } @@ -278,8 +321,37 @@ type AppendOption func(*pendingWrite) // with a given stream. func UpdateSchemaDescriptor(schema *descriptorpb.DescriptorProto) AppendOption { return func(pw *pendingWrite) { - // create a new descriptorVersion and attach it to the pending write. - pw.descVersion = newDescriptorVersion(schema) + prev := pw.reqTmpl + if prev == nil { + prev = newVersionedTemplate() + } + pw.reqTmpl = prev.revise(reviseProtoSchema(schema)) + } +} + +// UpdateMissingValueInterpretations updates the per-column missing-value intepretations settings, +// and is retained for subsequent writes. See the WithMissingValueInterpretations WriterOption for +// more details. +func UpdateMissingValueInterpretations(mvi map[string]storagepb.AppendRowsRequest_MissingValueInterpretation) AppendOption { + return func(pw *pendingWrite) { + prev := pw.reqTmpl + if prev == nil { + prev = newVersionedTemplate() + } + pw.reqTmpl = prev.revise(reviseMissingValueInterpretations(mvi)) + } +} + +// UpdateDefaultMissingValueInterpretations updates the default intepretations setting for the stream, +// and is retained for subsequent writes. See the WithDefaultMissingValueInterpretations WriterOption for +// more details. +func UpdateDefaultMissingValueInterpretations(def storagepb.AppendRowsRequest_MissingValueInterpretation) AppendOption { + return func(pw *pendingWrite) { + prev := pw.reqTmpl + if prev == nil { + prev = newVersionedTemplate() + } + pw.reqTmpl = prev.revise(reviseDefaultMissingValueInterpretation(def)) } } diff --git a/bigquery/storage/managedwriter/options_test.go b/bigquery/storage/managedwriter/options_test.go index cb64d5e0b3a1..e8a64b6fdd6f 100644 --- a/bigquery/storage/managedwriter/options_test.go +++ b/bigquery/storage/managedwriter/options_test.go @@ -18,11 +18,15 @@ import ( "sync" "testing" + "cloud.google.com/go/bigquery/storage/apiv1/storagepb" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" "github.com/googleapis/gax-go/v2" "google.golang.org/api/option" "google.golang.org/grpc" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/testing/protocmp" + "google.golang.org/protobuf/types/descriptorpb" ) func TestCustomClientOptions(t *testing.T) { @@ -140,6 +144,7 @@ func TestWriterOptions(t *testing.T) { want: func() *ManagedStream { ms := &ManagedStream{ streamSettings: defaultStreamSettings(), + curTemplate: newVersionedTemplate(), } ms.streamSettings.streamType = BufferedStream return ms @@ -151,6 +156,7 @@ func TestWriterOptions(t *testing.T) { want: func() *ManagedStream { ms := &ManagedStream{ streamSettings: defaultStreamSettings(), + curTemplate: newVersionedTemplate(), } ms.streamSettings.MaxInflightRequests = 2 return ms @@ -162,6 +168,7 @@ func TestWriterOptions(t *testing.T) { want: func() *ManagedStream { ms := &ManagedStream{ streamSettings: defaultStreamSettings(), + curTemplate: newVersionedTemplate(), } ms.streamSettings.MaxInflightBytes = 5 return ms @@ -173,6 +180,7 @@ func TestWriterOptions(t *testing.T) { want: func() *ManagedStream { ms := &ManagedStream{ streamSettings: defaultStreamSettings(), + curTemplate: newVersionedTemplate(), } ms.streamSettings.TraceID = "foo" return ms @@ -184,6 +192,7 @@ func TestWriterOptions(t *testing.T) { want: func() *ManagedStream { ms := &ManagedStream{ streamSettings: defaultStreamSettings(), + curTemplate: newVersionedTemplate(), } ms.streamSettings.destinationTable = "foo" return ms @@ -195,6 +204,7 @@ func TestWriterOptions(t *testing.T) { want: func() *ManagedStream { ms := &ManagedStream{ streamSettings: defaultStreamSettings(), + curTemplate: newVersionedTemplate(), } ms.streamSettings.dataOrigin = "origin" return ms @@ -206,6 +216,7 @@ func TestWriterOptions(t *testing.T) { want: func() *ManagedStream { ms := &ManagedStream{ streamSettings: defaultStreamSettings(), + curTemplate: newVersionedTemplate(), } ms.streamSettings.appendCallOptions = append(ms.streamSettings.appendCallOptions, gax.WithGRPCOptions(grpc.MaxCallSendMsgSize(1))) @@ -218,11 +229,66 @@ func TestWriterOptions(t *testing.T) { want: func() *ManagedStream { ms := &ManagedStream{ streamSettings: defaultStreamSettings(), + curTemplate: newVersionedTemplate(), } ms.retry = newStatelessRetryer() return ms }(), }, + { + desc: "WithSchemaDescriptor", + options: []WriterOption{WithSchemaDescriptor(&descriptorpb.DescriptorProto{Name: proto.String("name")})}, + want: func() *ManagedStream { + ms := &ManagedStream{ + streamSettings: defaultStreamSettings(), + curTemplate: newVersionedTemplate(), + } + ms.curTemplate.tmpl = &storagepb.AppendRowsRequest{ + Rows: &storagepb.AppendRowsRequest_ProtoRows{ + ProtoRows: &storagepb.AppendRowsRequest_ProtoData{ + WriterSchema: &storagepb.ProtoSchema{ + ProtoDescriptor: &descriptorpb.DescriptorProto{Name: proto.String("name")}, + }, + }, + }, + } + return ms + }(), + }, + { + desc: "WithDefaultMissingValueInterpretation", + options: []WriterOption{WithDefaultMissingValueInterpretation(storagepb.AppendRowsRequest_DEFAULT_VALUE)}, + want: func() *ManagedStream { + ms := &ManagedStream{ + streamSettings: defaultStreamSettings(), + curTemplate: newVersionedTemplate(), + } + ms.curTemplate.tmpl = &storagepb.AppendRowsRequest{ + DefaultMissingValueInterpretation: storagepb.AppendRowsRequest_DEFAULT_VALUE, + } + return ms + }(), + }, + { + desc: "WithtMissingValueInterpretations", + options: []WriterOption{WithMissingValueInterpretations(map[string]storagepb.AppendRowsRequest_MissingValueInterpretation{ + "foo": storagepb.AppendRowsRequest_DEFAULT_VALUE, + "bar": storagepb.AppendRowsRequest_NULL_VALUE, + })}, + want: func() *ManagedStream { + ms := &ManagedStream{ + streamSettings: defaultStreamSettings(), + curTemplate: newVersionedTemplate(), + } + ms.curTemplate.tmpl = &storagepb.AppendRowsRequest{ + MissingValueInterpretations: map[string]storagepb.AppendRowsRequest_MissingValueInterpretation{ + "foo": storagepb.AppendRowsRequest_DEFAULT_VALUE, + "bar": storagepb.AppendRowsRequest_NULL_VALUE, + }, + } + return ms + }(), + }, { desc: "multiple", options: []WriterOption{ @@ -230,10 +296,31 @@ func TestWriterOptions(t *testing.T) { WithMaxInflightBytes(5), WithTraceID("traceid"), EnableWriteRetries(true), + WithSchemaDescriptor(&descriptorpb.DescriptorProto{Name: proto.String("name")}), + WithDefaultMissingValueInterpretation(storagepb.AppendRowsRequest_DEFAULT_VALUE), + WithMissingValueInterpretations(map[string]storagepb.AppendRowsRequest_MissingValueInterpretation{ + "foo": storagepb.AppendRowsRequest_DEFAULT_VALUE, + "bar": storagepb.AppendRowsRequest_NULL_VALUE, + }), }, want: func() *ManagedStream { ms := &ManagedStream{ streamSettings: defaultStreamSettings(), + curTemplate: newVersionedTemplate(), + } + ms.curTemplate.tmpl = &storagepb.AppendRowsRequest{ + Rows: &storagepb.AppendRowsRequest_ProtoRows{ + ProtoRows: &storagepb.AppendRowsRequest_ProtoData{ + WriterSchema: &storagepb.ProtoSchema{ + ProtoDescriptor: &descriptorpb.DescriptorProto{Name: proto.String("name")}, + }, + }, + }, + MissingValueInterpretations: map[string]storagepb.AppendRowsRequest_MissingValueInterpretation{ + "foo": storagepb.AppendRowsRequest_DEFAULT_VALUE, + "bar": storagepb.AppendRowsRequest_NULL_VALUE, + }, + DefaultMissingValueInterpretation: storagepb.AppendRowsRequest_DEFAULT_VALUE, } ms.streamSettings.MaxInflightBytes = 5 ms.streamSettings.streamType = PendingStream @@ -247,6 +334,7 @@ func TestWriterOptions(t *testing.T) { for _, tc := range testCases { got := &ManagedStream{ streamSettings: defaultStreamSettings(), + curTemplate: newVersionedTemplate(), } for _, o := range tc.options { o(got) @@ -255,8 +343,12 @@ func TestWriterOptions(t *testing.T) { if diff := cmp.Diff(got, tc.want, cmp.AllowUnexported(ManagedStream{}, streamSettings{}), cmp.AllowUnexported(sync.Mutex{}), + cmp.AllowUnexported(versionedTemplate{}), + cmpopts.IgnoreFields(versionedTemplate{}, "versionTime", "hashVal"), + protocmp.Transform(), // versionedTemplate embeds proto messages. cmpopts.IgnoreUnexported(statelessRetryer{})); diff != "" { t.Errorf("diff in case (%s):\n%v", tc.desc, diff) } + } } diff --git a/bigquery/storage/managedwriter/send_optimizer.go b/bigquery/storage/managedwriter/send_optimizer.go index a5ae1f17a1a9..200b85834902 100644 --- a/bigquery/storage/managedwriter/send_optimizer.go +++ b/bigquery/storage/managedwriter/send_optimizer.go @@ -106,15 +106,15 @@ func (so *simplexOptimizer) isMultiplexing() bool { // Schema evolution is simply a case of sending the new WriterSchema as part of the request(s). No explicit // reconnection is necessary. type multiplexOptimizer struct { - prevStream string - prevDescriptorVersion *descriptorVersion - multiplexStreams bool + prevStream string + prevTemplate *versionedTemplate + multiplexStreams bool } func (mo *multiplexOptimizer) signalReset() { mo.prevStream = "" mo.multiplexStreams = false - mo.prevDescriptorVersion = nil + mo.prevTemplate = nil } func (mo *multiplexOptimizer) optimizeSend(arc storagepb.BigQueryWrite_AppendRowsClient, pw *pendingWrite) error { @@ -125,7 +125,7 @@ func (mo *multiplexOptimizer) optimizeSend(arc storagepb.BigQueryWrite_AppendRow err = arc.Send(req) if err == nil { mo.prevStream = req.GetWriteStream() - mo.prevDescriptorVersion = pw.descVersion + mo.prevTemplate = pw.reqTmpl } } else { // We have a previous send. Determine if it's the same stream or a different one. @@ -137,15 +137,15 @@ func (mo *multiplexOptimizer) optimizeSend(arc storagepb.BigQueryWrite_AppendRow // swapOnSuccess tracks if we need to update schema versions on successful send. swapOnSuccess := false req := pw.req - if mo.prevDescriptorVersion != nil { - if !mo.prevDescriptorVersion.eqVersion(pw.descVersion) { + if mo.prevTemplate != nil { + if !mo.prevTemplate.Compatible(pw.reqTmpl) { swapOnSuccess = true req = pw.constructFullRequest(false) // full request minus traceID. } } err = arc.Send(req) if err == nil && swapOnSuccess { - mo.prevDescriptorVersion = pw.descVersion + mo.prevTemplate = pw.reqTmpl } } else { // The previous send was for a different stream. Send a full request, minus traceId. @@ -154,7 +154,7 @@ func (mo *multiplexOptimizer) optimizeSend(arc storagepb.BigQueryWrite_AppendRow if err == nil { // Send successful. Update state to reflect this send is now the "previous" state. mo.prevStream = pw.writeStreamID - mo.prevDescriptorVersion = pw.descVersion + mo.prevTemplate = pw.reqTmpl } // Also, note that we've sent traffic for multiple streams, which means the backend recognizes this // is a multiplex stream as well. @@ -168,67 +168,6 @@ func (mo *multiplexOptimizer) isMultiplexing() bool { return mo.multiplexStreams } -// getDescriptorFromAppend is a utility method for extracting the deeply nested schema -// descriptor from a request. It returns a nil if the descriptor is not set. -func getDescriptorFromAppend(req *storagepb.AppendRowsRequest) *descriptorpb.DescriptorProto { - if pr := req.GetProtoRows(); pr != nil { - if ws := pr.GetWriterSchema(); ws != nil { - return ws.GetProtoDescriptor() - } - } - return nil -} - -// descriptorVersion is used for faster comparisons of proto descriptors. Deep equality comparisons -// of DescriptorProto can be very costly, so we use a simple versioning strategy based on -// time and a crc32 hash of the serialized proto bytes. -// -// The descriptorVersion is used for retaining schema, signalling schema change and optimizing requests. -type descriptorVersion struct { - versionTime time.Time - descriptorProto *descriptorpb.DescriptorProto - hashVal uint32 -} - -func newDescriptorVersion(in *descriptorpb.DescriptorProto) *descriptorVersion { - var hashVal uint32 - // It is a known issue that we may have non-deterministic serialization of a DescriptorProto - // due to the nature of protobuf. Our primary protection is the time-based version identifier, - // this hashing is primarily for time collisions. - if b, err := proto.Marshal(in); err == nil { - hashVal = crc32.ChecksumIEEE(b) - } - return &descriptorVersion{ - versionTime: time.Now(), - descriptorProto: proto.Clone(in).(*descriptorpb.DescriptorProto), - hashVal: hashVal, - } -} - -// eqVersion is the fast equality comparison that uses the versionTime and crc32 hash -// in place of deep proto equality. -func (dv *descriptorVersion) eqVersion(other *descriptorVersion) bool { - if dv == nil || other == nil { - return false - } - if dv.versionTime != other.versionTime { - return false - } - if dv.hashVal == 0 || other.hashVal == 0 { - return false - } - if dv.hashVal != other.hashVal { - return false - } - return true -} - -// isNewer reports whether the current schema bears a newer time (version) -// than the other. -func (dv *descriptorVersion) isNewer(other *descriptorVersion) bool { - return dv.versionTime.UnixNano() > other.versionTime.UnixNano() -} - // versionedTemplate is used for faster comparison of the templated part of // an AppendRowsRequest, which bears settings-like fields related to schema // and default value configuration. Direct proto comparison through something diff --git a/bigquery/storage/managedwriter/send_optimizer_test.go b/bigquery/storage/managedwriter/send_optimizer_test.go index 5b0848b61a0a..10948c5c1022 100644 --- a/bigquery/storage/managedwriter/send_optimizer_test.go +++ b/bigquery/storage/managedwriter/send_optimizer_test.go @@ -62,11 +62,11 @@ func TestSendOptimizer(t *testing.T) { description: "verbose-optimizer", optimizer: &verboseOptimizer{}, reqs: func() []*pendingWrite { - dv := newDescriptorVersion(exampleDP) + tmpl := newVersionedTemplate().revise(reviseProtoSchema(exampleDP)) return []*pendingWrite{ - newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), dv, exampleStreamID, exampleTraceID), - newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), dv, exampleStreamID, exampleTraceID), - newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), dv, exampleStreamID, exampleTraceID), + newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID), + newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID), + newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID), } }(), sendResults: []error{ @@ -84,11 +84,11 @@ func TestSendOptimizer(t *testing.T) { description: "simplex no errors", optimizer: &simplexOptimizer{}, reqs: func() []*pendingWrite { - dv := newDescriptorVersion(exampleDP) + tmpl := newVersionedTemplate().revise(reviseProtoSchema(exampleDP)) return []*pendingWrite{ - newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), dv, exampleStreamID, exampleTraceID), - newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), dv, exampleStreamID, exampleTraceID), - newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), dv, exampleStreamID, exampleTraceID), + newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID), + newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID), + newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID), } }(), sendResults: []error{ @@ -114,11 +114,11 @@ func TestSendOptimizer(t *testing.T) { description: "simplex w/partial errors", optimizer: &simplexOptimizer{}, reqs: func() []*pendingWrite { - dv := newDescriptorVersion(exampleDP) + tmpl := newVersionedTemplate().revise(reviseProtoSchema(exampleDP)) return []*pendingWrite{ - newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), dv, exampleStreamID, exampleTraceID), - newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), dv, exampleStreamID, exampleTraceID), - newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), dv, exampleStreamID, exampleTraceID), + newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID), + newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID), + newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID), } }(), sendResults: []error{ @@ -144,11 +144,11 @@ func TestSendOptimizer(t *testing.T) { description: "multiplex single all errors", optimizer: &multiplexOptimizer{}, reqs: func() []*pendingWrite { - dv := newDescriptorVersion(exampleDP) + tmpl := newVersionedTemplate().revise(reviseProtoSchema(exampleDP)) return []*pendingWrite{ - newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), dv, exampleStreamID, exampleTraceID), - newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), dv, exampleStreamID, exampleTraceID), - newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), dv, exampleStreamID, exampleTraceID), + newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID), + newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID), + newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID), } }(), sendResults: []error{ @@ -166,11 +166,11 @@ func TestSendOptimizer(t *testing.T) { description: "multiplex single no errors", optimizer: &multiplexOptimizer{}, reqs: func() []*pendingWrite { - dv := newDescriptorVersion(exampleDP) + tmpl := newVersionedTemplate().revise(reviseProtoSchema(exampleDP)) return []*pendingWrite{ - newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), dv, exampleStreamID, exampleTraceID), - newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), dv, exampleStreamID, exampleTraceID), - newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), dv, exampleStreamID, exampleTraceID), + newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID), + newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID), + newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID), } }(), sendResults: []error{ @@ -193,8 +193,8 @@ func TestSendOptimizer(t *testing.T) { description: "multiplex interleave", optimizer: &multiplexOptimizer{}, reqs: func() []*pendingWrite { - dvA := newDescriptorVersion(exampleDP) - dvB := newDescriptorVersion(protodesc.ToDescriptorProto((&testdata.AllSupportedTypes{}).ProtoReflect().Descriptor())) + tmplA := newVersionedTemplate().revise(reviseProtoSchema(exampleDP)) + tmplB := newVersionedTemplate().revise(reviseProtoSchema(protodesc.ToDescriptorProto((&testdata.AllSupportedTypes{}).ProtoReflect().Descriptor()))) reqA := proto.Clone(exampleReq).(*storagepb.AppendRowsRequest) reqA.WriteStream = "alpha" @@ -203,16 +203,16 @@ func TestSendOptimizer(t *testing.T) { reqB.WriteStream = "beta" writes := make([]*pendingWrite, 10) - writes[0] = newPendingWrite(ctx, nil, reqA, dvA, reqA.GetWriteStream(), exampleTraceID) - writes[1] = newPendingWrite(ctx, nil, reqA, dvA, reqA.GetWriteStream(), exampleTraceID) - writes[2] = newPendingWrite(ctx, nil, reqB, dvB, reqB.GetWriteStream(), exampleTraceID) - writes[3] = newPendingWrite(ctx, nil, reqA, dvA, reqA.GetWriteStream(), exampleTraceID) - writes[4] = newPendingWrite(ctx, nil, reqB, dvB, reqB.GetWriteStream(), exampleTraceID) - writes[5] = newPendingWrite(ctx, nil, reqB, dvB, reqB.GetWriteStream(), exampleTraceID) - writes[6] = newPendingWrite(ctx, nil, reqB, dvB, reqB.GetWriteStream(), exampleTraceID) - writes[7] = newPendingWrite(ctx, nil, reqB, dvB, reqB.GetWriteStream(), exampleTraceID) - writes[8] = newPendingWrite(ctx, nil, reqA, dvA, reqA.GetWriteStream(), exampleTraceID) - writes[9] = newPendingWrite(ctx, nil, reqA, dvA, reqA.GetWriteStream(), exampleTraceID) + writes[0] = newPendingWrite(ctx, nil, reqA, tmplA, reqA.GetWriteStream(), exampleTraceID) + writes[1] = newPendingWrite(ctx, nil, reqA, tmplA, reqA.GetWriteStream(), exampleTraceID) + writes[2] = newPendingWrite(ctx, nil, reqB, tmplB, reqB.GetWriteStream(), exampleTraceID) + writes[3] = newPendingWrite(ctx, nil, reqA, tmplA, reqA.GetWriteStream(), exampleTraceID) + writes[4] = newPendingWrite(ctx, nil, reqB, tmplB, reqB.GetWriteStream(), exampleTraceID) + writes[5] = newPendingWrite(ctx, nil, reqB, tmplB, reqB.GetWriteStream(), exampleTraceID) + writes[6] = newPendingWrite(ctx, nil, reqB, tmplB, reqB.GetWriteStream(), exampleTraceID) + writes[7] = newPendingWrite(ctx, nil, reqB, tmplB, reqB.GetWriteStream(), exampleTraceID) + writes[8] = newPendingWrite(ctx, nil, reqA, tmplA, reqA.GetWriteStream(), exampleTraceID) + writes[9] = newPendingWrite(ctx, nil, reqA, tmplA, reqA.GetWriteStream(), exampleTraceID) return writes }(), @@ -270,16 +270,16 @@ func TestSendOptimizer(t *testing.T) { description: "multiplex w/evolution", optimizer: &multiplexOptimizer{}, reqs: func() []*pendingWrite { - dvOld := newDescriptorVersion(exampleDP) - dvNew := newDescriptorVersion(&descriptorpb.DescriptorProto{Name: proto.String("new")}) + tmplOld := newVersionedTemplate().revise(reviseProtoSchema(exampleDP)) + tmplNew := tmplOld.revise(reviseProtoSchema(&descriptorpb.DescriptorProto{Name: proto.String("new")})) example := proto.Clone(exampleReq).(*storagepb.AppendRowsRequest) writes := make([]*pendingWrite, 4) - writes[0] = newPendingWrite(ctx, nil, example, dvOld, exampleStreamID, exampleTraceID) - writes[1] = newPendingWrite(ctx, nil, example, dvOld, exampleStreamID, exampleTraceID) - writes[2] = newPendingWrite(ctx, nil, example, dvNew, exampleStreamID, exampleTraceID) - writes[3] = newPendingWrite(ctx, nil, example, dvNew, exampleStreamID, exampleTraceID) + writes[0] = newPendingWrite(ctx, nil, example, tmplOld, exampleStreamID, exampleTraceID) + writes[1] = newPendingWrite(ctx, nil, example, tmplOld, exampleStreamID, exampleTraceID) + writes[2] = newPendingWrite(ctx, nil, example, tmplNew, exampleStreamID, exampleTraceID) + writes[3] = newPendingWrite(ctx, nil, example, tmplNew, exampleStreamID, exampleTraceID) return writes }(), @@ -336,63 +336,6 @@ func TestSendOptimizer(t *testing.T) { } } -func TestDescriptorVersion_EqVersion(t *testing.T) { - - exampleDV := newDescriptorVersion(&descriptorpb.DescriptorProto{Name: proto.String("foo")}) - copiedExampleDV := &descriptorVersion{versionTime: exampleDV.versionTime, descriptorProto: &descriptorpb.DescriptorProto{Name: proto.String("foo")}, hashVal: exampleDV.hashVal} - exampleDV2 := &descriptorVersion{versionTime: exampleDV.versionTime, descriptorProto: &descriptorpb.DescriptorProto{Name: proto.String("bar")}, hashVal: exampleDV.hashVal} - - testCases := []struct { - desc string - current *descriptorVersion - other *descriptorVersion - want bool - }{ - { - desc: "both nil", - }, - { - desc: "nil current", - other: newDescriptorVersion(&descriptorpb.DescriptorProto{}), - want: false, - }, - { - desc: "nil other", - current: newDescriptorVersion(&descriptorpb.DescriptorProto{}), - want: false, - }, - { - desc: "mismatched", - current: newDescriptorVersion(&descriptorpb.DescriptorProto{}), - other: newDescriptorVersion(&descriptorpb.DescriptorProto{}), - want: false, - }, - { - desc: "equal, same reference", - current: exampleDV, - other: exampleDV, - want: true, - }, - { - desc: "equal, different references", - current: exampleDV, - other: copiedExampleDV, - want: true, - }, - { - desc: "almost equal aka a collision", - current: exampleDV, - other: exampleDV2, - want: true, - }, - } - for _, tc := range testCases { - if got := tc.current.eqVersion(tc.other); got != tc.want { - t.Errorf("case %q, got %t want %t", tc.desc, got, tc.want) - } - } -} - func TestVersionedTemplate(t *testing.T) { testCases := []struct { desc string From 44876d560234a7e78fdb4a2d30ec116c0a63e834 Mon Sep 17 00:00:00 2001 From: Seth Hollyman Date: Fri, 13 Oct 2023 01:48:47 +0000 Subject: [PATCH 3/9] revision improvement --- bigquery/storage/managedwriter/send_optimizer.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/bigquery/storage/managedwriter/send_optimizer.go b/bigquery/storage/managedwriter/send_optimizer.go index 200b85834902..f61595851dcf 100644 --- a/bigquery/storage/managedwriter/send_optimizer.go +++ b/bigquery/storage/managedwriter/send_optimizer.go @@ -203,10 +203,12 @@ func (vt *versionedTemplate) computeHash() { type templateRevisionF func(m *storagepb.AppendRowsRequest) -// revise makes a new versionedTemplate from the existing template, applying any changes +// revise makes a new versionedTemplate from the existing template, applying any changes. +// The original revision is returned if there's no effective difference after changes are +// applied. func (vt *versionedTemplate) revise(changes ...templateRevisionF) *versionedTemplate { if len(changes) == 0 { - // if there's no changes, simply return the base revision + // if there's no changes, return the base revision immediately. return vt } out := &versionedTemplate{ @@ -217,6 +219,11 @@ func (vt *versionedTemplate) revise(changes ...templateRevisionF) *versionedTemp r(out.tmpl) } out.computeHash() + if out.Compatible(vt) { + // The changes didn't yield an measured difference. Return the base revision to avoid + // possible connection churn from no-op revisions. + return vt + } return out } From 20da20f35beabfd4b8318724aaf285597fe8f01b Mon Sep 17 00:00:00 2001 From: Seth Hollyman Date: Fri, 13 Oct 2023 17:10:07 +0000 Subject: [PATCH 4/9] first pass integration testing --- .../storage/managedwriter/integration_test.go | 95 +++++++- .../storage/managedwriter/managed_stream.go | 7 + bigquery/storage/managedwriter/options.go | 4 +- .../storage/managedwriter/testdata/schemas.go | 25 ++ .../managedwriter/testdata/testing.pb.go | 220 +++++++++++++++--- .../managedwriter/testdata/testing.proto | 12 + 6 files changed, 332 insertions(+), 31 deletions(-) diff --git a/bigquery/storage/managedwriter/integration_test.go b/bigquery/storage/managedwriter/integration_test.go index e13439afa664..1f4b14daa072 100644 --- a/bigquery/storage/managedwriter/integration_test.go +++ b/bigquery/storage/managedwriter/integration_test.go @@ -259,7 +259,9 @@ func TestIntegration_ManagedWriter(t *testing.T) { t.Run("TestLargeInsertWithRetry", func(t *testing.T) { testLargeInsertWithRetry(ctx, t, mwClient, bqClient, dataset) }) - + t.Run("DefaultValueHandling", func(t *testing.T) { + testDefaultValueHandling(ctx, t, mwClient, bqClient, dataset) + }) }) } @@ -1262,6 +1264,97 @@ func testSchemaEvolution(ctx context.Context, t *testing.T, mwClient *Client, bq ) } +func testDefaultValueHandling(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset, opts ...WriterOption) { + testTable := dataset.Table(tableIDs.New()) + if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.DefaultValueSchema}); err != nil { + t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err) + } + + m := &testdata.DefaultValuesPartialSchema{ + // We only popu + Id: proto.String("someval"), + } + var data []byte + var err error + if data, err = proto.Marshal(m); err != nil { + t.Fatalf("failed to marshal test row data") + } + descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor()) + + // setup a new stream. + opts = append(opts, WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID))) + opts = append(opts, WithSchemaDescriptor(descriptorProto)) + ms, err := mwClient.NewManagedStream(ctx, opts...) + if err != nil { + t.Fatalf("NewManagedStream: %v", err) + } + validateTableConstraints(ctx, t, bqClient, testTable, "before send", + withExactRowCount(0)) + + var result *AppendResult + + // Send one row, verify default values were set as expected. + + result, err = ms.AppendRows(ctx, [][]byte{data}) + if err != nil { + t.Errorf("append failed: %v", err) + } + // Wait for the result to indicate ready, then validate. + _, err = result.GetResult(ctx) + if err != nil { + t.Errorf("error on append: %v", err) + } + + validateTableConstraints(ctx, t, bqClient, testTable, "after first row", + withExactRowCount(1), + withNonNullCount("id", 1), + withNullCount("strcol", 1), + withNonNullCount("strcol_withdef", 1), + withNullCount("intcol", 1), + withNonNullCount("intcol_withdef", 1)) + + // Change default MVI to use nulls + result, err = ms.AppendRows(ctx, [][]byte{data}, UpdateDefaultMissingValueInterpretation(storagepb.AppendRowsRequest_NULL_VALUE)) + if err != nil { + t.Errorf("append failed: %v", err) + } + // Wait for the result to indicate ready, then validate. + _, err = result.GetResult(ctx) + if err != nil { + t.Errorf("error on append: %v", err) + } + + validateTableConstraints(ctx, t, bqClient, testTable, "after second row (default mvi)", + withExactRowCount(2), + withNullCount("strcol", 2), + withNullCount("strcol_withdef", 1), + withNullCount("intcol", 2), + withNullCount("strcol_withdef", 1), + ) + + // Change per-column MVI to use default value + result, err = ms.AppendRows(ctx, [][]byte{data}, + UpdateMissingValueInterpretations(map[string]storagepb.AppendRowsRequest_MissingValueInterpretation{ + "strcol_withdefault": storagepb.AppendRowsRequest_NULL_VALUE, + })) + if err != nil { + t.Errorf("append failed: %v", err) + } + // Wait for the result to indicate ready, then validate. + _, err = result.GetResult(ctx) + if err != nil { + t.Errorf("error on append: %v", err) + } + + validateTableConstraints(ctx, t, bqClient, testTable, "after second row (default mvi)", + withExactRowCount(3), + withNullCount("strcol", 3), + withNullCount("strcol_withdef", 1), + withNullCount("intcol", 3), + withNullCount("strcol_withdef", 2), + ) +} + func TestIntegration_DetectProjectID(t *testing.T) { ctx := context.Background() testCreds := testutil.Credentials(ctx) diff --git a/bigquery/storage/managedwriter/managed_stream.go b/bigquery/storage/managedwriter/managed_stream.go index ee73e32502b9..090933d7e706 100644 --- a/bigquery/storage/managedwriter/managed_stream.go +++ b/bigquery/storage/managedwriter/managed_stream.go @@ -305,6 +305,13 @@ func (ms *ManagedStream) AppendRows(ctx context.Context, data [][]byte, opts ... for _, opt := range opts { opt(pw) } + // Post-request fixup after options are applied. + if pw.reqTmpl != nil { + if pw.reqTmpl.tmpl != nil { + // MVIs must be set on each request, but _default_ MVIs persist across the stream lifetime. Sigh. + pw.req.MissingValueInterpretations = pw.reqTmpl.tmpl.GetMissingValueInterpretations() + } + } // Call the underlying append. The stream has it's own retained context and will surface expiry on // it's own, but we also need to respect any deadline for the provided context. diff --git a/bigquery/storage/managedwriter/options.go b/bigquery/storage/managedwriter/options.go index 4181204bb1b9..6abecdde4d9b 100644 --- a/bigquery/storage/managedwriter/options.go +++ b/bigquery/storage/managedwriter/options.go @@ -342,10 +342,10 @@ func UpdateMissingValueInterpretations(mvi map[string]storagepb.AppendRowsReques } } -// UpdateDefaultMissingValueInterpretations updates the default intepretations setting for the stream, +// UpdateDefaultMissingValueInterpretation updates the default intepretations setting for the stream, // and is retained for subsequent writes. See the WithDefaultMissingValueInterpretations WriterOption for // more details. -func UpdateDefaultMissingValueInterpretations(def storagepb.AppendRowsRequest_MissingValueInterpretation) AppendOption { +func UpdateDefaultMissingValueInterpretation(def storagepb.AppendRowsRequest_MissingValueInterpretation) AppendOption { return func(pw *pendingWrite) { prev := pw.reqTmpl if prev == nil { diff --git a/bigquery/storage/managedwriter/testdata/schemas.go b/bigquery/storage/managedwriter/testdata/schemas.go index d1d73dd2557f..8135ad8fd368 100644 --- a/bigquery/storage/managedwriter/testdata/schemas.go +++ b/bigquery/storage/managedwriter/testdata/schemas.go @@ -296,4 +296,29 @@ var ( Type: bigquery.IntegerFieldType, }, } + + DefaultValueSchema bigquery.Schema = bigquery.Schema{ + { + Name: "id", + Type: bigquery.StringFieldType, + }, + { + Name: "strcol", + Type: bigquery.StringFieldType, + }, + { + Name: "strcol_withdef", + Type: bigquery.StringFieldType, + DefaultValueExpression: "\"defaultvalue\"", + }, + { + Name: "intcol", + Type: bigquery.IntegerFieldType, + }, + { + Name: "intcol_withdef", + Type: bigquery.IntegerFieldType, + DefaultValueExpression: "-99", + }, + } ) diff --git a/bigquery/storage/managedwriter/testdata/testing.pb.go b/bigquery/storage/managedwriter/testdata/testing.pb.go index 75ae418b7d4c..fafa7f6b9676 100644 --- a/bigquery/storage/managedwriter/testdata/testing.pb.go +++ b/bigquery/storage/managedwriter/testdata/testing.pb.go @@ -14,19 +14,18 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.27.1 +// protoc-gen-go v1.28.0 // protoc v3.17.3 // source: testing.proto package testdata import ( - reflect "reflect" - sync "sync" - protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" wrapperspb "google.golang.org/protobuf/types/known/wrapperspb" + reflect "reflect" + sync "sync" ) const ( @@ -660,6 +659,132 @@ func (*WithOneOf_StringValue) isWithOneOf_OneofValue() {} func (*WithOneOf_DoubleValue) isWithOneOf_OneofValue() {} +type DefaultValues struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Id *string `protobuf:"bytes,1,opt,name=id" json:"id,omitempty"` + Strcol *string `protobuf:"bytes,2,opt,name=strcol" json:"strcol,omitempty"` + StrcolWithdef *string `protobuf:"bytes,3,opt,name=strcol_withdef,json=strcolWithdef" json:"strcol_withdef,omitempty"` + Intcol *int64 `protobuf:"varint,4,opt,name=intcol" json:"intcol,omitempty"` + IntcolWithdef *int64 `protobuf:"varint,5,opt,name=intcol_withdef,json=intcolWithdef" json:"intcol_withdef,omitempty"` +} + +func (x *DefaultValues) Reset() { + *x = DefaultValues{} + if protoimpl.UnsafeEnabled { + mi := &file_testing_proto_msgTypes[9] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *DefaultValues) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*DefaultValues) ProtoMessage() {} + +func (x *DefaultValues) ProtoReflect() protoreflect.Message { + mi := &file_testing_proto_msgTypes[9] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use DefaultValues.ProtoReflect.Descriptor instead. +func (*DefaultValues) Descriptor() ([]byte, []int) { + return file_testing_proto_rawDescGZIP(), []int{9} +} + +func (x *DefaultValues) GetId() string { + if x != nil && x.Id != nil { + return *x.Id + } + return "" +} + +func (x *DefaultValues) GetStrcol() string { + if x != nil && x.Strcol != nil { + return *x.Strcol + } + return "" +} + +func (x *DefaultValues) GetStrcolWithdef() string { + if x != nil && x.StrcolWithdef != nil { + return *x.StrcolWithdef + } + return "" +} + +func (x *DefaultValues) GetIntcol() int64 { + if x != nil && x.Intcol != nil { + return *x.Intcol + } + return 0 +} + +func (x *DefaultValues) GetIntcolWithdef() int64 { + if x != nil && x.IntcolWithdef != nil { + return *x.IntcolWithdef + } + return 0 +} + +type DefaultValuesPartialSchema struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Id *string `protobuf:"bytes,1,opt,name=id" json:"id,omitempty"` +} + +func (x *DefaultValuesPartialSchema) Reset() { + *x = DefaultValuesPartialSchema{} + if protoimpl.UnsafeEnabled { + mi := &file_testing_proto_msgTypes[10] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *DefaultValuesPartialSchema) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*DefaultValuesPartialSchema) ProtoMessage() {} + +func (x *DefaultValuesPartialSchema) ProtoReflect() protoreflect.Message { + mi := &file_testing_proto_msgTypes[10] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use DefaultValuesPartialSchema.ProtoReflect.Descriptor instead. +func (*DefaultValuesPartialSchema) Descriptor() ([]byte, []int) { + return file_testing_proto_rawDescGZIP(), []int{10} +} + +func (x *DefaultValuesPartialSchema) GetId() string { + if x != nil && x.Id != nil { + return *x.Id + } + return "" +} + var File_testing_proto protoreflect.FileDescriptor var file_testing_proto_rawDesc = []byte{ @@ -741,14 +866,27 @@ var file_testing_proto_rawDesc = []byte{ 0x69, 0x6e, 0x67, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x23, 0x0a, 0x0c, 0x64, 0x6f, 0x75, 0x62, 0x6c, 0x65, 0x5f, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x01, 0x48, 0x00, 0x52, 0x0b, 0x64, 0x6f, 0x75, 0x62, 0x6c, 0x65, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x42, 0x0d, 0x0a, - 0x0b, 0x6f, 0x6e, 0x65, 0x6f, 0x66, 0x5f, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x2a, 0x28, 0x0a, 0x08, - 0x54, 0x65, 0x73, 0x74, 0x45, 0x6e, 0x75, 0x6d, 0x12, 0x0d, 0x0a, 0x09, 0x54, 0x65, 0x73, 0x74, - 0x45, 0x6e, 0x75, 0x6d, 0x30, 0x10, 0x00, 0x12, 0x0d, 0x0a, 0x09, 0x54, 0x65, 0x73, 0x74, 0x45, - 0x6e, 0x75, 0x6d, 0x31, 0x10, 0x01, 0x42, 0x3d, 0x5a, 0x3b, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2e, - 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x6f, 0x2f, 0x62, 0x69, - 0x67, 0x71, 0x75, 0x65, 0x72, 0x79, 0x2f, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x2f, 0x6d, - 0x61, 0x6e, 0x61, 0x67, 0x65, 0x64, 0x77, 0x72, 0x69, 0x74, 0x65, 0x72, 0x2f, 0x74, 0x65, 0x73, - 0x74, 0x64, 0x61, 0x74, 0x61, + 0x0b, 0x6f, 0x6e, 0x65, 0x6f, 0x66, 0x5f, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x22, 0x9d, 0x01, 0x0a, + 0x0d, 0x44, 0x65, 0x66, 0x61, 0x75, 0x6c, 0x74, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x73, 0x12, 0x0e, + 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x16, + 0x0a, 0x06, 0x73, 0x74, 0x72, 0x63, 0x6f, 0x6c, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, + 0x73, 0x74, 0x72, 0x63, 0x6f, 0x6c, 0x12, 0x25, 0x0a, 0x0e, 0x73, 0x74, 0x72, 0x63, 0x6f, 0x6c, + 0x5f, 0x77, 0x69, 0x74, 0x68, 0x64, 0x65, 0x66, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, + 0x73, 0x74, 0x72, 0x63, 0x6f, 0x6c, 0x57, 0x69, 0x74, 0x68, 0x64, 0x65, 0x66, 0x12, 0x16, 0x0a, + 0x06, 0x69, 0x6e, 0x74, 0x63, 0x6f, 0x6c, 0x18, 0x04, 0x20, 0x01, 0x28, 0x03, 0x52, 0x06, 0x69, + 0x6e, 0x74, 0x63, 0x6f, 0x6c, 0x12, 0x25, 0x0a, 0x0e, 0x69, 0x6e, 0x74, 0x63, 0x6f, 0x6c, 0x5f, + 0x77, 0x69, 0x74, 0x68, 0x64, 0x65, 0x66, 0x18, 0x05, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0d, 0x69, + 0x6e, 0x74, 0x63, 0x6f, 0x6c, 0x57, 0x69, 0x74, 0x68, 0x64, 0x65, 0x66, 0x22, 0x2c, 0x0a, 0x1a, + 0x44, 0x65, 0x66, 0x61, 0x75, 0x6c, 0x74, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x73, 0x50, 0x61, 0x72, + 0x74, 0x69, 0x61, 0x6c, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x2a, 0x28, 0x0a, 0x08, 0x54, 0x65, + 0x73, 0x74, 0x45, 0x6e, 0x75, 0x6d, 0x12, 0x0d, 0x0a, 0x09, 0x54, 0x65, 0x73, 0x74, 0x45, 0x6e, + 0x75, 0x6d, 0x30, 0x10, 0x00, 0x12, 0x0d, 0x0a, 0x09, 0x54, 0x65, 0x73, 0x74, 0x45, 0x6e, 0x75, + 0x6d, 0x31, 0x10, 0x01, 0x42, 0x3d, 0x5a, 0x3b, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2e, 0x67, 0x6f, + 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x6f, 0x2f, 0x62, 0x69, 0x67, 0x71, + 0x75, 0x65, 0x72, 0x79, 0x2f, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x2f, 0x6d, 0x61, 0x6e, + 0x61, 0x67, 0x65, 0x64, 0x77, 0x72, 0x69, 0x74, 0x65, 0x72, 0x2f, 0x74, 0x65, 0x73, 0x74, 0x64, + 0x61, 0x74, 0x61, } var ( @@ -764,25 +902,27 @@ func file_testing_proto_rawDescGZIP() []byte { } var file_testing_proto_enumTypes = make([]protoimpl.EnumInfo, 1) -var file_testing_proto_msgTypes = make([]protoimpl.MessageInfo, 9) +var file_testing_proto_msgTypes = make([]protoimpl.MessageInfo, 11) var file_testing_proto_goTypes = []interface{}{ - (TestEnum)(0), // 0: testdata.TestEnum - (*AllSupportedTypes)(nil), // 1: testdata.AllSupportedTypes - (*WithWellKnownTypes)(nil), // 2: testdata.WithWellKnownTypes - (*InnerType)(nil), // 3: testdata.InnerType - (*NestedType)(nil), // 4: testdata.NestedType - (*ComplexType)(nil), // 5: testdata.ComplexType - (*ContainsRecursive)(nil), // 6: testdata.ContainsRecursive - (*RecursiveType)(nil), // 7: testdata.RecursiveType - (*RecursiveTypeTopMessage)(nil), // 8: testdata.RecursiveTypeTopMessage - (*WithOneOf)(nil), // 9: testdata.WithOneOf - (*wrapperspb.Int64Value)(nil), // 10: google.protobuf.Int64Value - (*wrapperspb.StringValue)(nil), // 11: google.protobuf.StringValue + (TestEnum)(0), // 0: testdata.TestEnum + (*AllSupportedTypes)(nil), // 1: testdata.AllSupportedTypes + (*WithWellKnownTypes)(nil), // 2: testdata.WithWellKnownTypes + (*InnerType)(nil), // 3: testdata.InnerType + (*NestedType)(nil), // 4: testdata.NestedType + (*ComplexType)(nil), // 5: testdata.ComplexType + (*ContainsRecursive)(nil), // 6: testdata.ContainsRecursive + (*RecursiveType)(nil), // 7: testdata.RecursiveType + (*RecursiveTypeTopMessage)(nil), // 8: testdata.RecursiveTypeTopMessage + (*WithOneOf)(nil), // 9: testdata.WithOneOf + (*DefaultValues)(nil), // 10: testdata.DefaultValues + (*DefaultValuesPartialSchema)(nil), // 11: testdata.DefaultValuesPartialSchema + (*wrapperspb.Int64Value)(nil), // 12: google.protobuf.Int64Value + (*wrapperspb.StringValue)(nil), // 13: google.protobuf.StringValue } var file_testing_proto_depIdxs = []int32{ 0, // 0: testdata.AllSupportedTypes.enum_value:type_name -> testdata.TestEnum - 10, // 1: testdata.WithWellKnownTypes.wrapped_int64:type_name -> google.protobuf.Int64Value - 11, // 2: testdata.WithWellKnownTypes.wrapped_string:type_name -> google.protobuf.StringValue + 12, // 1: testdata.WithWellKnownTypes.wrapped_int64:type_name -> google.protobuf.Int64Value + 13, // 2: testdata.WithWellKnownTypes.wrapped_string:type_name -> google.protobuf.StringValue 3, // 3: testdata.NestedType.inner_type:type_name -> testdata.InnerType 4, // 4: testdata.ComplexType.nested_repeated_type:type_name -> testdata.NestedType 3, // 5: testdata.ComplexType.inner_type:type_name -> testdata.InnerType @@ -910,6 +1050,30 @@ func file_testing_proto_init() { return nil } } + file_testing_proto_msgTypes[9].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*DefaultValues); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_testing_proto_msgTypes[10].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*DefaultValuesPartialSchema); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } file_testing_proto_msgTypes[8].OneofWrappers = []interface{}{ (*WithOneOf_StringValue)(nil), @@ -921,7 +1085,7 @@ func file_testing_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_testing_proto_rawDesc, NumEnums: 1, - NumMessages: 9, + NumMessages: 11, NumExtensions: 0, NumServices: 0, }, diff --git a/bigquery/storage/managedwriter/testdata/testing.proto b/bigquery/storage/managedwriter/testdata/testing.proto index 36f676521309..baf0205244a6 100644 --- a/bigquery/storage/managedwriter/testdata/testing.proto +++ b/bigquery/storage/managedwriter/testdata/testing.proto @@ -73,4 +73,16 @@ message WithOneOf { string string_value = 2; double double_value = 3; } +} + +message DefaultValues { + optional string id = 1; + optional string strcol = 2; + optional string strcol_withdef = 3; + optional int64 intcol = 4; + optional int64 intcol_withdef = 5; +} + +message DefaultValuesPartialSchema { + optional string id = 1; } \ No newline at end of file From 69d7fa6414d6bc48739d1b4c6e5eca9e24a8d4b6 Mon Sep 17 00:00:00 2001 From: Seth Hollyman Date: Fri, 13 Oct 2023 20:22:09 +0000 Subject: [PATCH 5/9] update default value integration test --- .../storage/managedwriter/integration_test.go | 36 +++---- .../storage/managedwriter/testdata/schemas.go | 9 ++ .../managedwriter/testdata/testing.pb.go | 96 +++++++++++++++---- .../managedwriter/testdata/testing.proto | 6 ++ 4 files changed, 111 insertions(+), 36 deletions(-) diff --git a/bigquery/storage/managedwriter/integration_test.go b/bigquery/storage/managedwriter/integration_test.go index 1f4b14daa072..790db3a82f07 100644 --- a/bigquery/storage/managedwriter/integration_test.go +++ b/bigquery/storage/managedwriter/integration_test.go @@ -1308,13 +1308,14 @@ func testDefaultValueHandling(ctx context.Context, t *testing.T, mwClient *Clien validateTableConstraints(ctx, t, bqClient, testTable, "after first row", withExactRowCount(1), withNonNullCount("id", 1), - withNullCount("strcol", 1), - withNonNullCount("strcol_withdef", 1), - withNullCount("intcol", 1), - withNonNullCount("intcol_withdef", 1)) + withNullCount("strcol_withdef", 1), + withNullCount("intcol_withdef", 1), + withNullCount("otherstr_withdef", 0)) // not part of partial schema - // Change default MVI to use nulls - result, err = ms.AppendRows(ctx, [][]byte{data}, UpdateDefaultMissingValueInterpretation(storagepb.AppendRowsRequest_NULL_VALUE)) + // Change default MVI to use nulls. + // We expect the fields in the partial schema to leverage nulls rather than default values. + // The fields outside the partial schema continue to obey default values. + result, err = ms.AppendRows(ctx, [][]byte{data}, UpdateDefaultMissingValueInterpretation(storagepb.AppendRowsRequest_DEFAULT_VALUE)) if err != nil { t.Errorf("append failed: %v", err) } @@ -1324,18 +1325,15 @@ func testDefaultValueHandling(ctx context.Context, t *testing.T, mwClient *Clien t.Errorf("error on append: %v", err) } - validateTableConstraints(ctx, t, bqClient, testTable, "after second row (default mvi)", + validateTableConstraints(ctx, t, bqClient, testTable, "after second row (default mvi is DEFAULT_VALUE)", withExactRowCount(2), - withNullCount("strcol", 2), - withNullCount("strcol_withdef", 1), - withNullCount("intcol", 2), - withNullCount("strcol_withdef", 1), - ) + withNullCount("strcol_withdef", 1), // doesn't increment, as it gets default value + withNullCount("intcol_withdef", 1)) // doesn't increment, as it gets default value // Change per-column MVI to use default value result, err = ms.AppendRows(ctx, [][]byte{data}, UpdateMissingValueInterpretations(map[string]storagepb.AppendRowsRequest_MissingValueInterpretation{ - "strcol_withdefault": storagepb.AppendRowsRequest_NULL_VALUE, + "strcol_withdef": storagepb.AppendRowsRequest_NULL_VALUE, })) if err != nil { t.Errorf("append failed: %v", err) @@ -1346,12 +1344,14 @@ func testDefaultValueHandling(ctx context.Context, t *testing.T, mwClient *Clien t.Errorf("error on append: %v", err) } - validateTableConstraints(ctx, t, bqClient, testTable, "after second row (default mvi)", + validateTableConstraints(ctx, t, bqClient, testTable, "after third row (explicit column mvi)", withExactRowCount(3), - withNullCount("strcol", 3), - withNullCount("strcol_withdef", 1), - withNullCount("intcol", 3), - withNullCount("strcol_withdef", 2), + withNullCount("strcol_withdef", 2), // increments as it's null for this column + withNullCount("intcol_withdef", 1), // doesn't increment, still default value + withNonNullCount("otherstr_withdef", 3), // not part of descriptor, always gets default value + withNullCount("otherstr", 3), // not part of descriptor, always gets null + withNullCount("strcol", 3), // no default value defined, always gets null + withNullCount("intcol", 3), // no default value defined, always gets null ) } diff --git a/bigquery/storage/managedwriter/testdata/schemas.go b/bigquery/storage/managedwriter/testdata/schemas.go index 8135ad8fd368..8d949a288196 100644 --- a/bigquery/storage/managedwriter/testdata/schemas.go +++ b/bigquery/storage/managedwriter/testdata/schemas.go @@ -320,5 +320,14 @@ var ( Type: bigquery.IntegerFieldType, DefaultValueExpression: "-99", }, + { + Name: "otherstr", + Type: bigquery.StringFieldType, + }, + { + Name: "otherstr_withdef", + Type: bigquery.StringFieldType, + DefaultValueExpression: "\"otherval\"", + }, } ) diff --git a/bigquery/storage/managedwriter/testdata/testing.pb.go b/bigquery/storage/managedwriter/testdata/testing.pb.go index fafa7f6b9676..984280257940 100644 --- a/bigquery/storage/managedwriter/testdata/testing.pb.go +++ b/bigquery/storage/managedwriter/testdata/testing.pb.go @@ -664,11 +664,13 @@ type DefaultValues struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Id *string `protobuf:"bytes,1,opt,name=id" json:"id,omitempty"` - Strcol *string `protobuf:"bytes,2,opt,name=strcol" json:"strcol,omitempty"` - StrcolWithdef *string `protobuf:"bytes,3,opt,name=strcol_withdef,json=strcolWithdef" json:"strcol_withdef,omitempty"` - Intcol *int64 `protobuf:"varint,4,opt,name=intcol" json:"intcol,omitempty"` - IntcolWithdef *int64 `protobuf:"varint,5,opt,name=intcol_withdef,json=intcolWithdef" json:"intcol_withdef,omitempty"` + Id *string `protobuf:"bytes,1,opt,name=id" json:"id,omitempty"` + Strcol *string `protobuf:"bytes,2,opt,name=strcol" json:"strcol,omitempty"` + StrcolWithdef *string `protobuf:"bytes,3,opt,name=strcol_withdef,json=strcolWithdef" json:"strcol_withdef,omitempty"` + Intcol *int64 `protobuf:"varint,4,opt,name=intcol" json:"intcol,omitempty"` + IntcolWithdef *int64 `protobuf:"varint,5,opt,name=intcol_withdef,json=intcolWithdef" json:"intcol_withdef,omitempty"` + Otherstr *string `protobuf:"bytes,6,opt,name=otherstr" json:"otherstr,omitempty"` + OtherstrDefault *string `protobuf:"bytes,7,opt,name=otherstr_default,json=otherstrDefault" json:"otherstr_default,omitempty"` } func (x *DefaultValues) Reset() { @@ -738,12 +740,30 @@ func (x *DefaultValues) GetIntcolWithdef() int64 { return 0 } +func (x *DefaultValues) GetOtherstr() string { + if x != nil && x.Otherstr != nil { + return *x.Otherstr + } + return "" +} + +func (x *DefaultValues) GetOtherstrDefault() string { + if x != nil && x.OtherstrDefault != nil { + return *x.OtherstrDefault + } + return "" +} + type DefaultValuesPartialSchema struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Id *string `protobuf:"bytes,1,opt,name=id" json:"id,omitempty"` + Id *string `protobuf:"bytes,1,opt,name=id" json:"id,omitempty"` + Strcol *string `protobuf:"bytes,2,opt,name=strcol" json:"strcol,omitempty"` + StrcolWithdef *string `protobuf:"bytes,3,opt,name=strcol_withdef,json=strcolWithdef" json:"strcol_withdef,omitempty"` + Intcol *int64 `protobuf:"varint,4,opt,name=intcol" json:"intcol,omitempty"` + IntcolWithdef *int64 `protobuf:"varint,5,opt,name=intcol_withdef,json=intcolWithdef" json:"intcol_withdef,omitempty"` } func (x *DefaultValuesPartialSchema) Reset() { @@ -785,6 +805,34 @@ func (x *DefaultValuesPartialSchema) GetId() string { return "" } +func (x *DefaultValuesPartialSchema) GetStrcol() string { + if x != nil && x.Strcol != nil { + return *x.Strcol + } + return "" +} + +func (x *DefaultValuesPartialSchema) GetStrcolWithdef() string { + if x != nil && x.StrcolWithdef != nil { + return *x.StrcolWithdef + } + return "" +} + +func (x *DefaultValuesPartialSchema) GetIntcol() int64 { + if x != nil && x.Intcol != nil { + return *x.Intcol + } + return 0 +} + +func (x *DefaultValuesPartialSchema) GetIntcolWithdef() int64 { + if x != nil && x.IntcolWithdef != nil { + return *x.IntcolWithdef + } + return 0 +} + var File_testing_proto protoreflect.FileDescriptor var file_testing_proto_rawDesc = []byte{ @@ -866,7 +914,7 @@ var file_testing_proto_rawDesc = []byte{ 0x69, 0x6e, 0x67, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x23, 0x0a, 0x0c, 0x64, 0x6f, 0x75, 0x62, 0x6c, 0x65, 0x5f, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x01, 0x48, 0x00, 0x52, 0x0b, 0x64, 0x6f, 0x75, 0x62, 0x6c, 0x65, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x42, 0x0d, 0x0a, - 0x0b, 0x6f, 0x6e, 0x65, 0x6f, 0x66, 0x5f, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x22, 0x9d, 0x01, 0x0a, + 0x0b, 0x6f, 0x6e, 0x65, 0x6f, 0x66, 0x5f, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x22, 0xe4, 0x01, 0x0a, 0x0d, 0x44, 0x65, 0x66, 0x61, 0x75, 0x6c, 0x74, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x73, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x72, 0x63, 0x6f, 0x6c, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, @@ -876,17 +924,29 @@ var file_testing_proto_rawDesc = []byte{ 0x06, 0x69, 0x6e, 0x74, 0x63, 0x6f, 0x6c, 0x18, 0x04, 0x20, 0x01, 0x28, 0x03, 0x52, 0x06, 0x69, 0x6e, 0x74, 0x63, 0x6f, 0x6c, 0x12, 0x25, 0x0a, 0x0e, 0x69, 0x6e, 0x74, 0x63, 0x6f, 0x6c, 0x5f, 0x77, 0x69, 0x74, 0x68, 0x64, 0x65, 0x66, 0x18, 0x05, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0d, 0x69, - 0x6e, 0x74, 0x63, 0x6f, 0x6c, 0x57, 0x69, 0x74, 0x68, 0x64, 0x65, 0x66, 0x22, 0x2c, 0x0a, 0x1a, - 0x44, 0x65, 0x66, 0x61, 0x75, 0x6c, 0x74, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x73, 0x50, 0x61, 0x72, - 0x74, 0x69, 0x61, 0x6c, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, - 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x2a, 0x28, 0x0a, 0x08, 0x54, 0x65, - 0x73, 0x74, 0x45, 0x6e, 0x75, 0x6d, 0x12, 0x0d, 0x0a, 0x09, 0x54, 0x65, 0x73, 0x74, 0x45, 0x6e, - 0x75, 0x6d, 0x30, 0x10, 0x00, 0x12, 0x0d, 0x0a, 0x09, 0x54, 0x65, 0x73, 0x74, 0x45, 0x6e, 0x75, - 0x6d, 0x31, 0x10, 0x01, 0x42, 0x3d, 0x5a, 0x3b, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2e, 0x67, 0x6f, - 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x6f, 0x2f, 0x62, 0x69, 0x67, 0x71, - 0x75, 0x65, 0x72, 0x79, 0x2f, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x2f, 0x6d, 0x61, 0x6e, - 0x61, 0x67, 0x65, 0x64, 0x77, 0x72, 0x69, 0x74, 0x65, 0x72, 0x2f, 0x74, 0x65, 0x73, 0x74, 0x64, - 0x61, 0x74, 0x61, + 0x6e, 0x74, 0x63, 0x6f, 0x6c, 0x57, 0x69, 0x74, 0x68, 0x64, 0x65, 0x66, 0x12, 0x1a, 0x0a, 0x08, + 0x6f, 0x74, 0x68, 0x65, 0x72, 0x73, 0x74, 0x72, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, + 0x6f, 0x74, 0x68, 0x65, 0x72, 0x73, 0x74, 0x72, 0x12, 0x29, 0x0a, 0x10, 0x6f, 0x74, 0x68, 0x65, + 0x72, 0x73, 0x74, 0x72, 0x5f, 0x64, 0x65, 0x66, 0x61, 0x75, 0x6c, 0x74, 0x18, 0x07, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x0f, 0x6f, 0x74, 0x68, 0x65, 0x72, 0x73, 0x74, 0x72, 0x44, 0x65, 0x66, 0x61, + 0x75, 0x6c, 0x74, 0x22, 0xaa, 0x01, 0x0a, 0x1a, 0x44, 0x65, 0x66, 0x61, 0x75, 0x6c, 0x74, 0x56, + 0x61, 0x6c, 0x75, 0x65, 0x73, 0x50, 0x61, 0x72, 0x74, 0x69, 0x61, 0x6c, 0x53, 0x63, 0x68, 0x65, + 0x6d, 0x61, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, + 0x69, 0x64, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x72, 0x63, 0x6f, 0x6c, 0x18, 0x02, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x06, 0x73, 0x74, 0x72, 0x63, 0x6f, 0x6c, 0x12, 0x25, 0x0a, 0x0e, 0x73, 0x74, + 0x72, 0x63, 0x6f, 0x6c, 0x5f, 0x77, 0x69, 0x74, 0x68, 0x64, 0x65, 0x66, 0x18, 0x03, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x0d, 0x73, 0x74, 0x72, 0x63, 0x6f, 0x6c, 0x57, 0x69, 0x74, 0x68, 0x64, 0x65, + 0x66, 0x12, 0x16, 0x0a, 0x06, 0x69, 0x6e, 0x74, 0x63, 0x6f, 0x6c, 0x18, 0x04, 0x20, 0x01, 0x28, + 0x03, 0x52, 0x06, 0x69, 0x6e, 0x74, 0x63, 0x6f, 0x6c, 0x12, 0x25, 0x0a, 0x0e, 0x69, 0x6e, 0x74, + 0x63, 0x6f, 0x6c, 0x5f, 0x77, 0x69, 0x74, 0x68, 0x64, 0x65, 0x66, 0x18, 0x05, 0x20, 0x01, 0x28, + 0x03, 0x52, 0x0d, 0x69, 0x6e, 0x74, 0x63, 0x6f, 0x6c, 0x57, 0x69, 0x74, 0x68, 0x64, 0x65, 0x66, + 0x2a, 0x28, 0x0a, 0x08, 0x54, 0x65, 0x73, 0x74, 0x45, 0x6e, 0x75, 0x6d, 0x12, 0x0d, 0x0a, 0x09, + 0x54, 0x65, 0x73, 0x74, 0x45, 0x6e, 0x75, 0x6d, 0x30, 0x10, 0x00, 0x12, 0x0d, 0x0a, 0x09, 0x54, + 0x65, 0x73, 0x74, 0x45, 0x6e, 0x75, 0x6d, 0x31, 0x10, 0x01, 0x42, 0x3d, 0x5a, 0x3b, 0x63, 0x6c, + 0x6f, 0x75, 0x64, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, + 0x6f, 0x2f, 0x62, 0x69, 0x67, 0x71, 0x75, 0x65, 0x72, 0x79, 0x2f, 0x73, 0x74, 0x6f, 0x72, 0x61, + 0x67, 0x65, 0x2f, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x64, 0x77, 0x72, 0x69, 0x74, 0x65, 0x72, + 0x2f, 0x74, 0x65, 0x73, 0x74, 0x64, 0x61, 0x74, 0x61, } var ( diff --git a/bigquery/storage/managedwriter/testdata/testing.proto b/bigquery/storage/managedwriter/testdata/testing.proto index baf0205244a6..79055b0a64f6 100644 --- a/bigquery/storage/managedwriter/testdata/testing.proto +++ b/bigquery/storage/managedwriter/testdata/testing.proto @@ -81,8 +81,14 @@ message DefaultValues { optional string strcol_withdef = 3; optional int64 intcol = 4; optional int64 intcol_withdef = 5; + optional string otherstr = 6; + optional string otherstr_default = 7; } message DefaultValuesPartialSchema { optional string id = 1; + optional string strcol = 2; + optional string strcol_withdef = 3; + optional int64 intcol = 4; + optional int64 intcol_withdef = 5; } \ No newline at end of file From 131555e05fb8f6d19a7623f63cca3f4d9fb3a71f Mon Sep 17 00:00:00 2001 From: Seth Hollyman Date: Fri, 13 Oct 2023 22:11:24 +0000 Subject: [PATCH 6/9] goimports --- bigquery/storage/managedwriter/testdata/testing.pb.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/bigquery/storage/managedwriter/testdata/testing.pb.go b/bigquery/storage/managedwriter/testdata/testing.pb.go index 984280257940..992da63955b3 100644 --- a/bigquery/storage/managedwriter/testdata/testing.pb.go +++ b/bigquery/storage/managedwriter/testdata/testing.pb.go @@ -21,11 +21,12 @@ package testdata import ( + reflect "reflect" + sync "sync" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" wrapperspb "google.golang.org/protobuf/types/known/wrapperspb" - reflect "reflect" - sync "sync" ) const ( From 12e0d01ae332d8abebbf669561efd10ace1c165f Mon Sep 17 00:00:00 2001 From: Seth Hollyman Date: Fri, 13 Oct 2023 22:21:28 +0000 Subject: [PATCH 7/9] comment cleanup --- bigquery/storage/managedwriter/options.go | 1 - 1 file changed, 1 deletion(-) diff --git a/bigquery/storage/managedwriter/options.go b/bigquery/storage/managedwriter/options.go index 6abecdde4d9b..735b5bef7d5f 100644 --- a/bigquery/storage/managedwriter/options.go +++ b/bigquery/storage/managedwriter/options.go @@ -267,7 +267,6 @@ func WithMissingValueInterpretations(mvi map[string]storagepb.AppendRowsRequest_ // for a given stream. See WithMissingValueIntepretations for more information about // missing values. // - // WithMissingValueIntepretations set for individual colums can override the default chosen // with this option. // From 5e08f2fc28f28bf092e7bbffbc4cb21fd113e43c Mon Sep 17 00:00:00 2001 From: Seth Hollyman Date: Fri, 20 Oct 2023 22:24:23 +0000 Subject: [PATCH 8/9] address reviewer feedback --- bigquery/storage/managedwriter/options.go | 35 ++++++++----------- .../storage/managedwriter/send_optimizer.go | 12 ++++--- 2 files changed, 22 insertions(+), 25 deletions(-) diff --git a/bigquery/storage/managedwriter/options.go b/bigquery/storage/managedwriter/options.go index 735b5bef7d5f..18f988123843 100644 --- a/bigquery/storage/managedwriter/options.go +++ b/bigquery/storage/managedwriter/options.go @@ -239,7 +239,7 @@ func WithSchemaDescriptor(dp *descriptorpb.DescriptorProto) WriterOption { } } -// WithMissingValueInterpretations controls over how missing values are interpreted +// WithMissingValueInterpretations controls how missing values are interpreted // for individual columns. // // You must provide a map to indicate how to interpret missing value for some fields. Missing @@ -247,13 +247,18 @@ func WithSchemaDescriptor(dp *descriptorpb.DescriptorProto) WriterOption { // the field name. The value is the interpretation of missing values for the // field. // -// For example, a map {'foo': NULL_VALUE, 'bar': DEFAULT_VALUE} means all -// missing values in field foo are interpreted as NULL, all missing values in -// field bar are interpreted as the default value of field bar in table -// schema. +// For example, the following option would indicate that missing values in the "foo" +// column are interpreted as null, whereas missing values in the "bar" column are +// treated as the default value: +// +// WithMissingValueInterpretations(map[string]storagepb.AppendRowsRequest_MissingValueInterpretation{ +// "foo": storagepb.AppendRowsRequest_DEFAULT_VALUE, +// "bar": storagepb.AppendRowsRequest_NULL_VALUE, +// }) // // If a field is not in this map and has missing values, the missing values -// in this field are interpreted as NULL. +// in this field are interpreted as NULL unless overridden with a default missing +// value interpretation. // // Currently, field name can only be top-level column name, can't be a struct // field path like 'foo.bar'. @@ -320,11 +325,7 @@ type AppendOption func(*pendingWrite) // with a given stream. func UpdateSchemaDescriptor(schema *descriptorpb.DescriptorProto) AppendOption { return func(pw *pendingWrite) { - prev := pw.reqTmpl - if prev == nil { - prev = newVersionedTemplate() - } - pw.reqTmpl = prev.revise(reviseProtoSchema(schema)) + pw.reqTmpl = pw.reqTmpl.revise(reviseProtoSchema(schema)) } } @@ -333,11 +334,7 @@ func UpdateSchemaDescriptor(schema *descriptorpb.DescriptorProto) AppendOption { // more details. func UpdateMissingValueInterpretations(mvi map[string]storagepb.AppendRowsRequest_MissingValueInterpretation) AppendOption { return func(pw *pendingWrite) { - prev := pw.reqTmpl - if prev == nil { - prev = newVersionedTemplate() - } - pw.reqTmpl = prev.revise(reviseMissingValueInterpretations(mvi)) + pw.reqTmpl = pw.reqTmpl.revise(reviseMissingValueInterpretations(mvi)) } } @@ -346,11 +343,7 @@ func UpdateMissingValueInterpretations(mvi map[string]storagepb.AppendRowsReques // more details. func UpdateDefaultMissingValueInterpretation(def storagepb.AppendRowsRequest_MissingValueInterpretation) AppendOption { return func(pw *pendingWrite) { - prev := pw.reqTmpl - if prev == nil { - prev = newVersionedTemplate() - } - pw.reqTmpl = prev.revise(reviseDefaultMissingValueInterpretation(def)) + pw.reqTmpl = pw.reqTmpl.revise(reviseDefaultMissingValueInterpretation(def)) } } diff --git a/bigquery/storage/managedwriter/send_optimizer.go b/bigquery/storage/managedwriter/send_optimizer.go index f61595851dcf..8feb9d2dee3e 100644 --- a/bigquery/storage/managedwriter/send_optimizer.go +++ b/bigquery/storage/managedwriter/send_optimizer.go @@ -207,22 +207,26 @@ type templateRevisionF func(m *storagepb.AppendRowsRequest) // The original revision is returned if there's no effective difference after changes are // applied. func (vt *versionedTemplate) revise(changes ...templateRevisionF) *versionedTemplate { + before := vt + if before == nil { + before = newVersionedTemplate() + } if len(changes) == 0 { // if there's no changes, return the base revision immediately. - return vt + return before } out := &versionedTemplate{ versionTime: time.Now(), - tmpl: proto.Clone(vt.tmpl).(*storagepb.AppendRowsRequest), + tmpl: proto.Clone(before.tmpl).(*storagepb.AppendRowsRequest), } for _, r := range changes { r(out.tmpl) } out.computeHash() - if out.Compatible(vt) { + if out.Compatible(before) { // The changes didn't yield an measured difference. Return the base revision to avoid // possible connection churn from no-op revisions. - return vt + return before } return out } From ee280d085d6405c4d4ccd8ec499d87c5ceb8655c Mon Sep 17 00:00:00 2001 From: Seth Hollyman Date: Tue, 24 Oct 2023 02:10:57 +0000 Subject: [PATCH 9/9] comment fix --- bigquery/storage/managedwriter/integration_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bigquery/storage/managedwriter/integration_test.go b/bigquery/storage/managedwriter/integration_test.go index 790db3a82f07..03970787bdc2 100644 --- a/bigquery/storage/managedwriter/integration_test.go +++ b/bigquery/storage/managedwriter/integration_test.go @@ -1271,7 +1271,7 @@ func testDefaultValueHandling(ctx context.Context, t *testing.T, mwClient *Clien } m := &testdata.DefaultValuesPartialSchema{ - // We only popu + // We only populate the id, as remaining fields are used to test default values. Id: proto.String("someval"), } var data []byte