Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace grpc/metadata's NewContext and FromContext calls with appropriate Incoming/Outgoing versions #627

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions bigtable/bigtable.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (c *Client) Close() error {
}

var (
idempotentRetryCodes = []codes.Code{codes.DeadlineExceeded, codes.Unavailable, codes.Aborted,codes.Internal}
idempotentRetryCodes = []codes.Code{codes.DeadlineExceeded, codes.Unavailable, codes.Aborted, codes.Internal}
isIdempotentRetryCode = make(map[codes.Code]bool)
retryOptions = []gax.CallOption{
gax.WithDelayTimeoutSettings(100*time.Millisecond, 2000*time.Millisecond, 1.2),
Expand Down Expand Up @@ -120,7 +120,7 @@ func (c *Client) Open(table string) *Table {
// By default, the yielded rows will contain all values in all cells.
// Use RowFilter to limit the cells returned.
func (t *Table) ReadRows(ctx context.Context, arg RowSet, f func(Row) bool, opts ...ReadOption) error {
ctx = mergeMetadata(ctx, t.md)
ctx = mergeOutgoingMetadata(ctx, t.md)

var prevRowKey string
err := gax.Invoke(ctx, func(ctx context.Context) error {
Expand Down Expand Up @@ -337,7 +337,7 @@ func (r RowRangeList) retainRowsAfter(lastRowKey string) RowSet {
}

func (r RowRangeList) valid() bool {
for _, rr := range r {
for _, rr := range r {
if rr.valid() {
return true
}
Expand Down Expand Up @@ -420,7 +420,7 @@ func mutationsAreRetryable(muts []*btpb.Mutation) bool {

// Apply applies a Mutation to a specific row.
func (t *Table) Apply(ctx context.Context, row string, m *Mutation, opts ...ApplyOption) error {
ctx = mergeMetadata(ctx, t.md)
ctx = mergeOutgoingMetadata(ctx, t.md)
after := func(res proto.Message) {
for _, o := range opts {
o.after(res)
Expand Down Expand Up @@ -582,7 +582,7 @@ type entryErr struct {
//
// Conditional mutations cannot be applied in bulk and providing one will result in an error.
func (t *Table) ApplyBulk(ctx context.Context, rowKeys []string, muts []*Mutation, opts ...ApplyOption) ([]error, error) {
ctx = mergeMetadata(ctx, t.md)
ctx = mergeOutgoingMetadata(ctx, t.md)
if len(rowKeys) != len(muts) {
return nil, fmt.Errorf("mismatched rowKeys and mutation array lengths: %d, %d", len(rowKeys), len(muts))
}
Expand Down Expand Up @@ -711,13 +711,13 @@ func (ts Timestamp) TruncateToMilliseconds() Timestamp {
if ts == ServerTime {
return ts
}
return ts - ts % 1000
return ts - ts%1000
}

// ApplyReadModifyWrite applies a ReadModifyWrite to a specific row.
// It returns the newly written cells.
func (t *Table) ApplyReadModifyWrite(ctx context.Context, row string, m *ReadModifyWrite) (Row, error) {
ctx = mergeMetadata(ctx, t.md)
ctx = mergeOutgoingMetadata(ctx, t.md)
req := &btpb.ReadModifyWriteRowRequest{
TableName: t.c.fullTableName(t.table),
RowKey: []byte(row),
Expand Down Expand Up @@ -774,9 +774,9 @@ func (m *ReadModifyWrite) Increment(family, column string, delta int64) {
})
}

// mergeMetadata returns a context populated by the existing metadata, if any,
// joined with internal metadata.
func mergeMetadata(ctx context.Context, md metadata.MD) context.Context {
mdCopy, _ := metadata.FromContext(ctx)
return metadata.NewContext(ctx, metadata.Join(mdCopy, md))
// mergeOutgoingMetadata returns a context populated by the existing outgoing metadata,
// if any, joined with internal metadata.
func mergeOutgoingMetadata(ctx context.Context, md metadata.MD) context.Context {
mdCopy, _ := metadata.FromOutgoingContext(ctx)
return metadata.NewOutgoingContext(ctx, metadata.Join(mdCopy, md))
}
12 changes: 6 additions & 6 deletions datastore/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,27 +71,27 @@ func newDatastoreClient(conn *grpc.ClientConn, projectID string) pb.DatastoreCli
}

func (dc *datastoreClient) Lookup(ctx context.Context, in *pb.LookupRequest, opts ...grpc.CallOption) (*pb.LookupResponse, error) {
return dc.c.Lookup(metadata.NewContext(ctx, dc.md), in, opts...)
return dc.c.Lookup(metadata.NewOutgoingContext(ctx, dc.md), in, opts...)
}

func (dc *datastoreClient) RunQuery(ctx context.Context, in *pb.RunQueryRequest, opts ...grpc.CallOption) (*pb.RunQueryResponse, error) {
return dc.c.RunQuery(metadata.NewContext(ctx, dc.md), in, opts...)
return dc.c.RunQuery(metadata.NewOutgoingContext(ctx, dc.md), in, opts...)
}

func (dc *datastoreClient) BeginTransaction(ctx context.Context, in *pb.BeginTransactionRequest, opts ...grpc.CallOption) (*pb.BeginTransactionResponse, error) {
return dc.c.BeginTransaction(metadata.NewContext(ctx, dc.md), in, opts...)
return dc.c.BeginTransaction(metadata.NewOutgoingContext(ctx, dc.md), in, opts...)
}

func (dc *datastoreClient) Commit(ctx context.Context, in *pb.CommitRequest, opts ...grpc.CallOption) (*pb.CommitResponse, error) {
return dc.c.Commit(metadata.NewContext(ctx, dc.md), in, opts...)
return dc.c.Commit(metadata.NewOutgoingContext(ctx, dc.md), in, opts...)
}

func (dc *datastoreClient) Rollback(ctx context.Context, in *pb.RollbackRequest, opts ...grpc.CallOption) (*pb.RollbackResponse, error) {
return dc.c.Rollback(metadata.NewContext(ctx, dc.md), in, opts...)
return dc.c.Rollback(metadata.NewOutgoingContext(ctx, dc.md), in, opts...)
}

func (dc *datastoreClient) AllocateIds(ctx context.Context, in *pb.AllocateIdsRequest, opts ...grpc.CallOption) (*pb.AllocateIdsResponse, error) {
return dc.c.AllocateIds(metadata.NewContext(ctx, dc.md), in, opts...)
return dc.c.AllocateIds(metadata.NewOutgoingContext(ctx, dc.md), in, opts...)
}

// Client is a client for reading and writing data in a datastore dataset.
Expand Down
6 changes: 3 additions & 3 deletions spanner/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,12 @@ func errDial(ci int, err error) error {
return e
}

func contextWithMetadata(ctx context.Context, md metadata.MD) context.Context {
existing, ok := metadata.FromContext(ctx)
func contextWithOutgoingMetadata(ctx context.Context, md metadata.MD) context.Context {
existing, ok := metadata.FromOutgoingContext(ctx)
if ok {
md = metadata.Join(existing, md)
}
return metadata.NewContext(ctx, md)
return metadata.NewOutgoingContext(ctx, md)
}

// NewClient creates a client to a database. A valid database name has the
Expand Down
12 changes: 6 additions & 6 deletions spanner/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func (s *session) ping() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
return runRetryable(ctx, func(ctx context.Context) error {
_, err := s.client.GetSession(contextWithMetadata(ctx, s.pool.md), &sppb.GetSessionRequest{Name: s.getID()}) // s.getID is safe even when s is invalid.
_, err := s.client.GetSession(contextWithOutgoingMetadata(ctx, s.pool.md), &sppb.GetSessionRequest{Name: s.getID()}) // s.getID is safe even when s is invalid.
return err
})
}
Expand All @@ -185,7 +185,7 @@ func (s *session) refreshIdle() bool {
defer cancel()
var sid string
err := runRetryable(ctx, func(ctx context.Context) error {
session, e := s.client.CreateSession(contextWithMetadata(ctx, s.pool.md), &sppb.CreateSessionRequest{Database: s.pool.db})
session, e := s.client.CreateSession(contextWithOutgoingMetadata(ctx, s.pool.md), &sppb.CreateSessionRequest{Database: s.pool.db})
if e != nil {
return e
}
Expand Down Expand Up @@ -216,7 +216,7 @@ func (s *session) refreshIdle() bool {
// If we fail to explicitly destroy the session, it will be eventually garbage collected by
// Cloud Spanner.
if err = runRetryable(ctx, func(ctx context.Context) error {
_, e := s.client.DeleteSession(contextWithMetadata(ctx, s.pool.md), &sppb.DeleteSessionRequest{Name: sid})
_, e := s.client.DeleteSession(contextWithOutgoingMetadata(ctx, s.pool.md), &sppb.DeleteSessionRequest{Name: sid})
return e
}); err != nil {
return false
Expand Down Expand Up @@ -541,7 +541,7 @@ func (p *sessionPool) isHealthy(s *session) bool {
// take returns a cached session if there are available ones; if there isn't any, it tries to allocate a new one.
// Session returned by take should be used for read operations.
func (p *sessionPool) take(ctx context.Context) (*sessionHandle, error) {
ctx = contextWithMetadata(ctx, p.md)
ctx = contextWithOutgoingMetadata(ctx, p.md)
for {
var (
s *session
Expand Down Expand Up @@ -595,7 +595,7 @@ func (p *sessionPool) take(ctx context.Context) (*sessionHandle, error) {
// takeWriteSession returns a write prepared cached session if there are available ones; if there isn't any, it tries to allocate a new one.
// Session returned should be used for read write transactions.
func (p *sessionPool) takeWriteSession(ctx context.Context) (*sessionHandle, error) {
ctx = contextWithMetadata(ctx, p.md)
ctx = contextWithOutgoingMetadata(ctx, p.md)
for {
var (
s *session
Expand Down Expand Up @@ -927,7 +927,7 @@ func (hc *healthChecker) worker(i int) {
if ws != nil {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
ws.prepareForWrite(contextWithMetadata(ctx, hc.pool.md))
ws.prepareForWrite(contextWithOutgoingMetadata(ctx, hc.pool.md))
hc.pool.recycle(ws)
hc.pool.mu.Lock()
hc.pool.prepareReqs--
Expand Down
14 changes: 7 additions & 7 deletions spanner/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (t *txReadOnly) ReadUsingIndex(ctx context.Context, table, index string, ke
return &RowIterator{err: errSessionClosed(sh)}
}
return stream(
contextWithMetadata(ctx, sh.getMetadata()),
contextWithOutgoingMetadata(ctx, sh.getMetadata()),
func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) {
return client.StreamingRead(ctx,
&sppb.ReadRequest{
Expand Down Expand Up @@ -167,7 +167,7 @@ func (t *txReadOnly) Query(ctx context.Context, statement Statement) *RowIterato
return &RowIterator{err: err}
}
return stream(
contextWithMetadata(ctx, sh.getMetadata()),
contextWithOutgoingMetadata(ctx, sh.getMetadata()),
func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) {
req.ResumeToken = resumeToken
return client.ExecuteStreamingSql(ctx, req)
Expand Down Expand Up @@ -296,7 +296,7 @@ func (t *ReadOnlyTransaction) begin(ctx context.Context) error {
if err != nil {
return err
}
err = runRetryable(contextWithMetadata(ctx, sh.getMetadata()), func(ctx context.Context) error {
err = runRetryable(contextWithOutgoingMetadata(ctx, sh.getMetadata()), func(ctx context.Context) error {
res, e := sh.getClient().BeginTransaction(ctx, &sppb.BeginTransactionRequest{
Session: sh.getID(),
Options: &sppb.TransactionOptions{
Expand Down Expand Up @@ -646,7 +646,7 @@ func (t *ReadWriteTransaction) begin(ctx context.Context) error {
t.state = txActive
return nil
}
tx, err := beginTransaction(contextWithMetadata(ctx, t.sh.getMetadata()), t.sh.getID(), t.sh.getClient())
tx, err := beginTransaction(contextWithOutgoingMetadata(ctx, t.sh.getMetadata()), t.sh.getID(), t.sh.getClient())
if err == nil {
t.tx = tx
t.state = txActive
Expand All @@ -673,7 +673,7 @@ func (t *ReadWriteTransaction) commit(ctx context.Context) (time.Time, error) {
if sid == "" || client == nil {
return ts, errSessionClosed(t.sh)
}
err = runRetryable(contextWithMetadata(ctx, t.sh.getMetadata()), func(ctx context.Context) error {
err = runRetryable(contextWithOutgoingMetadata(ctx, t.sh.getMetadata()), func(ctx context.Context) error {
var trailer metadata.MD
res, e := client.Commit(ctx, &sppb.CommitRequest{
Session: sid,
Expand Down Expand Up @@ -707,7 +707,7 @@ func (t *ReadWriteTransaction) rollback(ctx context.Context) {
if sid == "" || client == nil {
return
}
err := runRetryable(contextWithMetadata(ctx, t.sh.getMetadata()), func(ctx context.Context) error {
err := runRetryable(contextWithOutgoingMetadata(ctx, t.sh.getMetadata()), func(ctx context.Context) error {
_, e := client.Rollback(ctx, &sppb.RollbackRequest{
Session: sid,
TransactionId: t.tx,
Expand Down Expand Up @@ -776,7 +776,7 @@ func (t *writeOnlyTransaction) applyAtLeastOnce(ctx context.Context, ms ...*Muta
return e
}
}
res, e := sh.getClient().Commit(contextWithMetadata(ctx, sh.getMetadata()), &sppb.CommitRequest{
res, e := sh.getClient().Commit(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.CommitRequest{
Session: sh.getID(),
Transaction: &sppb.CommitRequest_SingleUseTransaction{
SingleUseTransaction: &sppb.TransactionOptions{
Expand Down
2 changes: 1 addition & 1 deletion trace/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func grpcUnaryInterceptor(ctx context.Context, method string, req, reply interfa
md = md.Copy() // metadata is immutable, copy.
md[grpcMetadataKey] = []string{header}
}
ctx = metadata.NewContext(ctx, md)
ctx = metadata.NewOutgoingContext(ctx, md)
}

err := invoker(ctx, method, req, reply, cc, opts...)
Expand Down