Skip to content

Commit

Permalink
fix(bigquery/storage/managedwriter): correct reconnection logic (#8164)
Browse files Browse the repository at this point in the history
Signalling for an AppendRows stream when schema changes is predicated on
the backend's status for the connection.  For a simplex
(non-multiplexed) connection, the expectation is the client closes and
reconnects to signal there's a change in the schema.

For a connection in multiplex mode, no reconnection is necessary and the
backend will look at the schema for changes.

In managedwriter, we allow a user to specify multiplex at the outset,
but for connections that haven't actually sent writes for more than a
single stream ID the backend doesn't recognize the multiplex status.

This PR expands the interface for send optimizer to signal whether the
optimizer has sent writes for multiple connections, and uses it when
making the determination about schema-based reconnects.  It also
augments the schema evolution test to validate using multiple
combinations of writer and client options.
  • Loading branch information
shollyman authored Jun 23, 2023
1 parent b051b36 commit a67d53d
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 13 deletions.
6 changes: 5 additions & 1 deletion bigquery/storage/managedwriter/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,8 +368,12 @@ func (co *connection) lockingAppend(pw *pendingWrite) error {
forceReconnect := false
if pw.writer != nil && pw.descVersion != nil && pw.descVersion.isNewer(pw.writer.curDescVersion) {
pw.writer.curDescVersion = pw.descVersion
if !canMultiplex(pw.writeStreamID) {
if co.optimizer == nil {
forceReconnect = true
} else {
if !co.optimizer.isMultiplexing() {
forceReconnect = true
}
}
}

Expand Down
68 changes: 56 additions & 12 deletions bigquery/storage/managedwriter/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,10 +245,6 @@ func TestIntegration_ManagedWriter(t *testing.T) {
t.Parallel()
testPendingStream(ctx, t, mwClient, bqClient, dataset)
})
t.Run("SchemaEvolution", func(t *testing.T) {
t.Parallel()
testSchemaEvolution(ctx, t, mwClient, bqClient, dataset)
})
t.Run("SimpleCDC", func(t *testing.T) {
t.Parallel()
testSimpleCDC(ctx, t, mwClient, bqClient, dataset)
Expand All @@ -267,6 +263,56 @@ func TestIntegration_ManagedWriter(t *testing.T) {
})
}

func TestIntegration_SchemaEvolution(t *testing.T) {

testcases := []struct {
desc string
clientOpts []option.ClientOption
writerOpts []WriterOption
}{
{
desc: "Simplex_Committed",
writerOpts: []WriterOption{
WithType(CommittedStream),
},
},
{
desc: "Simplex_Default",
writerOpts: []WriterOption{
WithType(DefaultStream),
},
},
{
desc: "Multiplex_Default",
clientOpts: []option.ClientOption{
WithMultiplexing(),
WithMultiplexPoolLimit(2),
},
writerOpts: []WriterOption{
WithType(DefaultStream),
},
},
}

for _, tc := range testcases {
mwClient, bqClient := getTestClients(context.Background(), t, tc.clientOpts...)
defer mwClient.Close()
defer bqClient.Close()

dataset, cleanup, err := setupTestDataset(context.Background(), t, bqClient, "asia-east1")
if err != nil {
t.Fatalf("failed to init test dataset: %v", err)
}
defer cleanup()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
t.Run(tc.desc, func(t *testing.T) {
testSchemaEvolution(ctx, t, mwClient, bqClient, dataset, tc.writerOpts...)
})
}
}

func testDefaultStream(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
testTable := dataset.Table(tableIDs.New())
if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil {
Expand Down Expand Up @@ -1094,7 +1140,7 @@ func testInstrumentation(ctx context.Context, t *testing.T, mwClient *Client, bq
}
}

func testSchemaEvolution(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
func testSchemaEvolution(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset, opts ...WriterOption) {
testTable := dataset.Table(tableIDs.New())
if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil {
t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err)
Expand All @@ -1104,11 +1150,9 @@ func testSchemaEvolution(ctx context.Context, t *testing.T, mwClient *Client, bq
descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor())

// setup a new stream.
ms, err := mwClient.NewManagedStream(ctx,
WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
WithSchemaDescriptor(descriptorProto),
WithType(CommittedStream),
)
opts = append(opts, WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)))
opts = append(opts, WithSchemaDescriptor(descriptorProto))
ms, err := mwClient.NewManagedStream(ctx, opts...)
if err != nil {
t.Fatalf("NewManagedStream: %v", err)
}
Expand Down Expand Up @@ -1154,7 +1198,7 @@ func testSchemaEvolution(ctx context.Context, t *testing.T, mwClient *Client, bq
// this subjects us to a possible race, as the backend that services GetWriteStream isn't necessarily the
// one in charge of the stream, and thus may report ready early.
for {
resp, err := ms.AppendRows(ctx, [][]byte{latestRow}, WithOffset(curOffset))
resp, err := ms.AppendRows(ctx, [][]byte{latestRow})
if err != nil {
t.Errorf("got error on dupe append: %v", err)
break
Expand All @@ -1181,7 +1225,7 @@ func testSchemaEvolution(ctx context.Context, t *testing.T, mwClient *Client, bq
t.Errorf("failed to marshal evolved message: %v", err)
}
// Send an append with an evolved schema
res, err := ms.AppendRows(ctx, [][]byte{b}, WithOffset(curOffset), UpdateSchemaDescriptor(descriptorProto))
res, err := ms.AppendRows(ctx, [][]byte{b}, UpdateSchemaDescriptor(descriptorProto))
if err != nil {
t.Errorf("failed evolved append: %v", err)
}
Expand Down
22 changes: 22 additions & 0 deletions bigquery/storage/managedwriter/send_optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ type sendOptimizer interface {

// optimizeSend handles possible manipulation of a request, and triggers the send.
optimizeSend(arc storagepb.BigQueryWrite_AppendRowsClient, pw *pendingWrite) error

// isMultiplexing tracks if we've actually sent writes to more than a single stream on this connection.
isMultiplexing() bool
}

// verboseOptimizer is a primarily a testing optimizer that always sends the full request.
Expand All @@ -50,6 +53,11 @@ func (vo *verboseOptimizer) optimizeSend(arc storagepb.BigQueryWrite_AppendRowsC
return arc.Send(pw.constructFullRequest(true))
}

func (vo *verboseOptimizer) isMultiplexing() bool {
// we declare this no to ensure we always reconnect on schema changes.
return false
}

// simplexOptimizer is used for connections bearing AppendRowsRequest for only a single stream.
//
// The optimizations here are straightforward:
Expand Down Expand Up @@ -80,6 +88,11 @@ func (so *simplexOptimizer) optimizeSend(arc storagepb.BigQueryWrite_AppendRowsC
return err
}

func (so *simplexOptimizer) isMultiplexing() bool {
// A simplex optimizer is not designed for multiplexing.
return false
}

// multiplexOptimizer is used for connections where requests for multiple default streams are sent on a common
// connection. Only default streams can currently be multiplexed.
//
Expand All @@ -93,10 +106,12 @@ func (so *simplexOptimizer) optimizeSend(arc storagepb.BigQueryWrite_AppendRowsC
type multiplexOptimizer struct {
prevStream string
prevDescriptorVersion *descriptorVersion
multiplexStreams bool
}

func (mo *multiplexOptimizer) signalReset() {
mo.prevStream = ""
mo.multiplexStreams = false
mo.prevDescriptorVersion = nil
}

Expand Down Expand Up @@ -139,11 +154,18 @@ func (mo *multiplexOptimizer) optimizeSend(arc storagepb.BigQueryWrite_AppendRow
mo.prevStream = pw.writeStreamID
mo.prevDescriptorVersion = pw.descVersion
}
// Also, note that we've sent traffic for multiple streams, which means the backend recognizes this
// is a multiplex stream as well.
mo.multiplexStreams = true
}
}
return err
}

func (mo *multiplexOptimizer) isMultiplexing() bool {
return mo.multiplexStreams
}

// getDescriptorFromAppend is a utility method for extracting the deeply nested schema
// descriptor from a request. It returns a nil if the descriptor is not set.
func getDescriptorFromAppend(req *storagepb.AppendRowsRequest) *descriptorpb.DescriptorProto {
Expand Down

0 comments on commit a67d53d

Please sign in to comment.