diff --git a/bigtable/bigtable.go b/bigtable/bigtable.go index 374ed573273f..5c2129cb7723 100644 --- a/bigtable/bigtable.go +++ b/bigtable/bigtable.go @@ -73,7 +73,7 @@ func (c *Client) Close() error { } var ( - idempotentRetryCodes = []codes.Code{codes.DeadlineExceeded, codes.Unavailable, codes.Aborted,codes.Internal} + idempotentRetryCodes = []codes.Code{codes.DeadlineExceeded, codes.Unavailable, codes.Aborted, codes.Internal} isIdempotentRetryCode = make(map[codes.Code]bool) retryOptions = []gax.CallOption{ gax.WithDelayTimeoutSettings(100*time.Millisecond, 2000*time.Millisecond, 1.2), @@ -120,7 +120,7 @@ func (c *Client) Open(table string) *Table { // By default, the yielded rows will contain all values in all cells. // Use RowFilter to limit the cells returned. func (t *Table) ReadRows(ctx context.Context, arg RowSet, f func(Row) bool, opts ...ReadOption) error { - ctx = mergeMetadata(ctx, t.md) + ctx = mergeOutgoingMetadata(ctx, t.md) var prevRowKey string err := gax.Invoke(ctx, func(ctx context.Context) error { @@ -337,7 +337,7 @@ func (r RowRangeList) retainRowsAfter(lastRowKey string) RowSet { } func (r RowRangeList) valid() bool { - for _, rr := range r { + for _, rr := range r { if rr.valid() { return true } @@ -420,7 +420,7 @@ func mutationsAreRetryable(muts []*btpb.Mutation) bool { // Apply applies a Mutation to a specific row. func (t *Table) Apply(ctx context.Context, row string, m *Mutation, opts ...ApplyOption) error { - ctx = mergeMetadata(ctx, t.md) + ctx = mergeOutgoingMetadata(ctx, t.md) after := func(res proto.Message) { for _, o := range opts { o.after(res) @@ -582,7 +582,7 @@ type entryErr struct { // // Conditional mutations cannot be applied in bulk and providing one will result in an error. func (t *Table) ApplyBulk(ctx context.Context, rowKeys []string, muts []*Mutation, opts ...ApplyOption) ([]error, error) { - ctx = mergeMetadata(ctx, t.md) + ctx = mergeOutgoingMetadata(ctx, t.md) if len(rowKeys) != len(muts) { return nil, fmt.Errorf("mismatched rowKeys and mutation array lengths: %d, %d", len(rowKeys), len(muts)) } @@ -711,13 +711,13 @@ func (ts Timestamp) TruncateToMilliseconds() Timestamp { if ts == ServerTime { return ts } - return ts - ts % 1000 + return ts - ts%1000 } // ApplyReadModifyWrite applies a ReadModifyWrite to a specific row. // It returns the newly written cells. func (t *Table) ApplyReadModifyWrite(ctx context.Context, row string, m *ReadModifyWrite) (Row, error) { - ctx = mergeMetadata(ctx, t.md) + ctx = mergeOutgoingMetadata(ctx, t.md) req := &btpb.ReadModifyWriteRowRequest{ TableName: t.c.fullTableName(t.table), RowKey: []byte(row), @@ -774,9 +774,9 @@ func (m *ReadModifyWrite) Increment(family, column string, delta int64) { }) } -// mergeMetadata returns a context populated by the existing metadata, if any, -// joined with internal metadata. -func mergeMetadata(ctx context.Context, md metadata.MD) context.Context { - mdCopy, _ := metadata.FromContext(ctx) - return metadata.NewContext(ctx, metadata.Join(mdCopy, md)) +// mergeOutgoingMetadata returns a context populated by the existing outgoing metadata, +// if any, joined with internal metadata. +func mergeOutgoingMetadata(ctx context.Context, md metadata.MD) context.Context { + mdCopy, _ := metadata.FromOutgoingContext(ctx) + return metadata.NewOutgoingContext(ctx, metadata.Join(mdCopy, md)) } diff --git a/datastore/datastore.go b/datastore/datastore.go index 91f92c437b51..e9ffbfab7f7c 100644 --- a/datastore/datastore.go +++ b/datastore/datastore.go @@ -71,27 +71,27 @@ func newDatastoreClient(conn *grpc.ClientConn, projectID string) pb.DatastoreCli } func (dc *datastoreClient) Lookup(ctx context.Context, in *pb.LookupRequest, opts ...grpc.CallOption) (*pb.LookupResponse, error) { - return dc.c.Lookup(metadata.NewContext(ctx, dc.md), in, opts...) + return dc.c.Lookup(metadata.NewOutgoingContext(ctx, dc.md), in, opts...) } func (dc *datastoreClient) RunQuery(ctx context.Context, in *pb.RunQueryRequest, opts ...grpc.CallOption) (*pb.RunQueryResponse, error) { - return dc.c.RunQuery(metadata.NewContext(ctx, dc.md), in, opts...) + return dc.c.RunQuery(metadata.NewOutgoingContext(ctx, dc.md), in, opts...) } func (dc *datastoreClient) BeginTransaction(ctx context.Context, in *pb.BeginTransactionRequest, opts ...grpc.CallOption) (*pb.BeginTransactionResponse, error) { - return dc.c.BeginTransaction(metadata.NewContext(ctx, dc.md), in, opts...) + return dc.c.BeginTransaction(metadata.NewOutgoingContext(ctx, dc.md), in, opts...) } func (dc *datastoreClient) Commit(ctx context.Context, in *pb.CommitRequest, opts ...grpc.CallOption) (*pb.CommitResponse, error) { - return dc.c.Commit(metadata.NewContext(ctx, dc.md), in, opts...) + return dc.c.Commit(metadata.NewOutgoingContext(ctx, dc.md), in, opts...) } func (dc *datastoreClient) Rollback(ctx context.Context, in *pb.RollbackRequest, opts ...grpc.CallOption) (*pb.RollbackResponse, error) { - return dc.c.Rollback(metadata.NewContext(ctx, dc.md), in, opts...) + return dc.c.Rollback(metadata.NewOutgoingContext(ctx, dc.md), in, opts...) } func (dc *datastoreClient) AllocateIds(ctx context.Context, in *pb.AllocateIdsRequest, opts ...grpc.CallOption) (*pb.AllocateIdsResponse, error) { - return dc.c.AllocateIds(metadata.NewContext(ctx, dc.md), in, opts...) + return dc.c.AllocateIds(metadata.NewOutgoingContext(ctx, dc.md), in, opts...) } // Client is a client for reading and writing data in a datastore dataset. diff --git a/spanner/client.go b/spanner/client.go index 84e68bfcdd7b..102ba4937b7d 100644 --- a/spanner/client.go +++ b/spanner/client.go @@ -96,12 +96,12 @@ func errDial(ci int, err error) error { return e } -func contextWithMetadata(ctx context.Context, md metadata.MD) context.Context { - existing, ok := metadata.FromContext(ctx) +func contextWithOutgoingMetadata(ctx context.Context, md metadata.MD) context.Context { + existing, ok := metadata.FromOutgoingContext(ctx) if ok { md = metadata.Join(existing, md) } - return metadata.NewContext(ctx, md) + return metadata.NewOutgoingContext(ctx, md) } // NewClient creates a client to a database. A valid database name has the diff --git a/spanner/session.go b/spanner/session.go index ed78d80c4760..1ac5d6fb0794 100644 --- a/spanner/session.go +++ b/spanner/session.go @@ -166,7 +166,7 @@ func (s *session) ping() error { ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() return runRetryable(ctx, func(ctx context.Context) error { - _, err := s.client.GetSession(contextWithMetadata(ctx, s.pool.md), &sppb.GetSessionRequest{Name: s.getID()}) // s.getID is safe even when s is invalid. + _, err := s.client.GetSession(contextWithOutgoingMetadata(ctx, s.pool.md), &sppb.GetSessionRequest{Name: s.getID()}) // s.getID is safe even when s is invalid. return err }) } @@ -185,7 +185,7 @@ func (s *session) refreshIdle() bool { defer cancel() var sid string err := runRetryable(ctx, func(ctx context.Context) error { - session, e := s.client.CreateSession(contextWithMetadata(ctx, s.pool.md), &sppb.CreateSessionRequest{Database: s.pool.db}) + session, e := s.client.CreateSession(contextWithOutgoingMetadata(ctx, s.pool.md), &sppb.CreateSessionRequest{Database: s.pool.db}) if e != nil { return e } @@ -216,7 +216,7 @@ func (s *session) refreshIdle() bool { // If we fail to explicitly destroy the session, it will be eventually garbage collected by // Cloud Spanner. if err = runRetryable(ctx, func(ctx context.Context) error { - _, e := s.client.DeleteSession(contextWithMetadata(ctx, s.pool.md), &sppb.DeleteSessionRequest{Name: sid}) + _, e := s.client.DeleteSession(contextWithOutgoingMetadata(ctx, s.pool.md), &sppb.DeleteSessionRequest{Name: sid}) return e }); err != nil { return false @@ -541,7 +541,7 @@ func (p *sessionPool) isHealthy(s *session) bool { // take returns a cached session if there are available ones; if there isn't any, it tries to allocate a new one. // Session returned by take should be used for read operations. func (p *sessionPool) take(ctx context.Context) (*sessionHandle, error) { - ctx = contextWithMetadata(ctx, p.md) + ctx = contextWithOutgoingMetadata(ctx, p.md) for { var ( s *session @@ -595,7 +595,7 @@ func (p *sessionPool) take(ctx context.Context) (*sessionHandle, error) { // takeWriteSession returns a write prepared cached session if there are available ones; if there isn't any, it tries to allocate a new one. // Session returned should be used for read write transactions. func (p *sessionPool) takeWriteSession(ctx context.Context) (*sessionHandle, error) { - ctx = contextWithMetadata(ctx, p.md) + ctx = contextWithOutgoingMetadata(ctx, p.md) for { var ( s *session @@ -927,7 +927,7 @@ func (hc *healthChecker) worker(i int) { if ws != nil { ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() - ws.prepareForWrite(contextWithMetadata(ctx, hc.pool.md)) + ws.prepareForWrite(contextWithOutgoingMetadata(ctx, hc.pool.md)) hc.pool.recycle(ws) hc.pool.mu.Lock() hc.pool.prepareReqs-- diff --git a/spanner/transaction.go b/spanner/transaction.go index 1d5a27977216..804446157d07 100644 --- a/spanner/transaction.go +++ b/spanner/transaction.go @@ -96,7 +96,7 @@ func (t *txReadOnly) ReadUsingIndex(ctx context.Context, table, index string, ke return &RowIterator{err: errSessionClosed(sh)} } return stream( - contextWithMetadata(ctx, sh.getMetadata()), + contextWithOutgoingMetadata(ctx, sh.getMetadata()), func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) { return client.StreamingRead(ctx, &sppb.ReadRequest{ @@ -167,7 +167,7 @@ func (t *txReadOnly) Query(ctx context.Context, statement Statement) *RowIterato return &RowIterator{err: err} } return stream( - contextWithMetadata(ctx, sh.getMetadata()), + contextWithOutgoingMetadata(ctx, sh.getMetadata()), func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) { req.ResumeToken = resumeToken return client.ExecuteStreamingSql(ctx, req) @@ -296,7 +296,7 @@ func (t *ReadOnlyTransaction) begin(ctx context.Context) error { if err != nil { return err } - err = runRetryable(contextWithMetadata(ctx, sh.getMetadata()), func(ctx context.Context) error { + err = runRetryable(contextWithOutgoingMetadata(ctx, sh.getMetadata()), func(ctx context.Context) error { res, e := sh.getClient().BeginTransaction(ctx, &sppb.BeginTransactionRequest{ Session: sh.getID(), Options: &sppb.TransactionOptions{ @@ -646,7 +646,7 @@ func (t *ReadWriteTransaction) begin(ctx context.Context) error { t.state = txActive return nil } - tx, err := beginTransaction(contextWithMetadata(ctx, t.sh.getMetadata()), t.sh.getID(), t.sh.getClient()) + tx, err := beginTransaction(contextWithOutgoingMetadata(ctx, t.sh.getMetadata()), t.sh.getID(), t.sh.getClient()) if err == nil { t.tx = tx t.state = txActive @@ -673,7 +673,7 @@ func (t *ReadWriteTransaction) commit(ctx context.Context) (time.Time, error) { if sid == "" || client == nil { return ts, errSessionClosed(t.sh) } - err = runRetryable(contextWithMetadata(ctx, t.sh.getMetadata()), func(ctx context.Context) error { + err = runRetryable(contextWithOutgoingMetadata(ctx, t.sh.getMetadata()), func(ctx context.Context) error { var trailer metadata.MD res, e := client.Commit(ctx, &sppb.CommitRequest{ Session: sid, @@ -707,7 +707,7 @@ func (t *ReadWriteTransaction) rollback(ctx context.Context) { if sid == "" || client == nil { return } - err := runRetryable(contextWithMetadata(ctx, t.sh.getMetadata()), func(ctx context.Context) error { + err := runRetryable(contextWithOutgoingMetadata(ctx, t.sh.getMetadata()), func(ctx context.Context) error { _, e := client.Rollback(ctx, &sppb.RollbackRequest{ Session: sid, TransactionId: t.tx, @@ -776,7 +776,7 @@ func (t *writeOnlyTransaction) applyAtLeastOnce(ctx context.Context, ms ...*Muta return e } } - res, e := sh.getClient().Commit(contextWithMetadata(ctx, sh.getMetadata()), &sppb.CommitRequest{ + res, e := sh.getClient().Commit(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.CommitRequest{ Session: sh.getID(), Transaction: &sppb.CommitRequest_SingleUseTransaction{ SingleUseTransaction: &sppb.TransactionOptions{ diff --git a/trace/grpc.go b/trace/grpc.go index 39cb13eaf4d3..9775d1962580 100644 --- a/trace/grpc.go +++ b/trace/grpc.go @@ -49,7 +49,7 @@ func grpcUnaryInterceptor(ctx context.Context, method string, req, reply interfa md = md.Copy() // metadata is immutable, copy. md[grpcMetadataKey] = []string{header} } - ctx = metadata.NewContext(ctx, md) + ctx = metadata.NewOutgoingContext(ctx, md) } err := invoker(ctx, method, req, reply, cc, opts...)