Skip to content

Commit

Permalink
Fix locking in getConnection (#288)
Browse files Browse the repository at this point in the history
* Fix locking in getConnection

Signed-off-by: Henry Robinson <[email protected]>

* Comment

Signed-off-by: Henry Robinson <[email protected]>

* Undo bash change

Signed-off-by: Henry Robinson <[email protected]>

---------

Signed-off-by: Henry Robinson <[email protected]>
  • Loading branch information
henryr authored Apr 10, 2024
1 parent de9cbc1 commit 83f23bc
Showing 1 changed file with 17 additions and 8 deletions.
25 changes: 17 additions & 8 deletions go/vt/vtgateproxy/vtgateproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"sync"

"google.golang.org/grpc"

"vitess.io/vitess/go/sqlescape"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/grpcclient"
Expand Down Expand Up @@ -59,32 +60,36 @@ func (proxy *VTGateProxy) getConnection(ctx context.Context, target string) (*vt
// If the connection exists, return it
proxy.mu.RLock()
existingConn := proxy.targetConns[target]
proxy.mu.RUnlock()

if existingConn != nil {
proxy.mu.RUnlock()
log.V(100).Infof("Reused connection for %v\n", target)
return existingConn, nil
}

// No luck, need to create a new one. Serialize new additions so we don't create multiple
// for a given target.
log.V(100).Infof("Need to create connection for %v\n", target)
proxy.mu.RUnlock()

proxy.mu.Lock()
defer proxy.mu.Unlock()

// Otherwise create a new connection after dropping the lock, allowing multiple requests to
// race to create the conn for now.
grpcclient.RegisterGRPCDialOptions(func(opts []grpc.DialOption) ([]grpc.DialOption, error) {
return append(opts, grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`)), nil
})
// Check again in case conn was made between lock acquisitions.
existingConn = proxy.targetConns[target]
if existingConn != nil {
log.V(100).Infof("Reused connection for %v\n", target)
return existingConn, nil
}

// Otherwise create a new connection. TODO: confirm this doesn't actually make a TCP connection, and returns quickly,
// otherwise we're going to have to do this while not holding the lock.
conn, err := vtgateconn.DialProtocol(ctx, "grpc", target)
if err != nil {
return nil, err
}

log.V(100).Infof("Created new connection for %v\n", target)
proxy.targetConns[target] = conn
proxy.mu.Unlock()

return conn, nil
}
Expand Down Expand Up @@ -174,4 +179,8 @@ func (proxy *VTGateProxy) StreamExecute(ctx context.Context, session *vtgateconn
}

func Init() {
log.V(100).Infof("Registering GRPC dial options")
grpcclient.RegisterGRPCDialOptions(func(opts []grpc.DialOption) ([]grpc.DialOption, error) {
return append(opts, grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`)), nil
})
}

0 comments on commit 83f23bc

Please sign in to comment.