Skip to content

Commit

Permalink
dynamically connect to a target specified in client attrs
Browse files Browse the repository at this point in the history
  • Loading branch information
demmer committed Nov 3, 2023
1 parent e41b6ed commit 97120bd
Showing 1 changed file with 38 additions and 32 deletions.
70 changes: 38 additions & 32 deletions go/vt/vtgateproxy/vtgateproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ import (
"flag"
"fmt"
"io"
"sync"
"time"

"google.golang.org/grpc"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/grpcclient"
"vitess.io/vitess/go/vt/log"
querypb "vitess.io/vitess/go/vt/proto/query"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/schema"
Expand All @@ -38,49 +38,66 @@ import (
)

var (
target = flag.String("target", "", "vtgate host:port target used to dial the GRPC connection")
dialTimeout = flag.Duration("dial_timeout", 5*time.Second, "dialer timeout for the GRPC connection")

defaultDDLStrategy = flag.String("ddl_strategy", string(schema.DDLStrategyDirect), "Set default strategy for DDL statements. Override with @@ddl_strategy session variable")
sysVarSetEnabled = flag.Bool("enable_system_settings", true, "This will enable the system settings to be changed per session at the database connection level")

vtGateProxy *VTGateProxy = &VTGateProxy{}
vtGateProxy *VTGateProxy = &VTGateProxy{
targetConns: map[string]*vtgateconn.VTGateConn{},
mu: sync.Mutex{},
}
)

type VTGateProxy struct {
conn *vtgateconn.VTGateConn
targetConns map[string]*vtgateconn.VTGateConn
mu sync.Mutex
}

func (proxy *VTGateProxy) connect(ctx context.Context) error {
grpcclient.RegisterGRPCDialOptions(func(opts []grpc.DialOption) ([]grpc.DialOption, error) {
return append(opts, grpc.WithBlock()), nil
})
func (proxy *VTGateProxy) getConnection(ctx context.Context, target string) (*vtgateconn.VTGateConn, error) {
// If the connection exists, return it
proxy.mu.Lock()
conn, _ := proxy.targetConns[target]
if conn != nil {
proxy.mu.Unlock()
return conn, nil
}
proxy.mu.Unlock()

// Otherwise create a new connection after dropping the lock, allowing multiple requests to
// race to create the conn for now.
// grpcclient.RegisterGRPCDialOptions(func(opts []grpc.DialOption) ([]grpc.DialOption, error) {
// return append(opts, grpc.WithBlock()), nil
// })

grpcclient.RegisterGRPCDialOptions(func(opts []grpc.DialOption) ([]grpc.DialOption, error) {
return append(opts, grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`)), nil
})

conn, err := vtgateconn.DialProtocol(ctx, "grpc", *target)
conn, err := vtgateconn.DialProtocol(ctx, "grpc", target)
if err != nil {
return err
return nil, err
}

proxy.conn = conn
return nil
proxy.mu.Lock()
proxy.targetConns[target] = conn
proxy.mu.Unlock()

return conn, nil
}

func (proxy *VTGateProxy) NewSession(options *querypb.ExecuteOptions, connectionAttributes map[string]string) (*vtgateconn.VTGateSession, error) {
if proxy.conn == nil {
return nil, vterrors.Errorf(vtrpcpb.Code_UNAVAILABLE, "not connnected")
func (proxy *VTGateProxy) NewSession(ctx context.Context, options *querypb.ExecuteOptions, connectionAttributes map[string]string) (*vtgateconn.VTGateSession, error) {
target, ok := connectionAttributes["target"]
if !ok {
return nil, vterrors.Errorf(vtrpcpb.Code_UNAVAILABLE, "no target string supplied by client")
}

target, ok := connectionAttributes["target"]
if ok {
fmt.Printf("Creating new session from upstream provided target string: %v\n", target)
conn, err := proxy.getConnection(ctx, target)
if err != nil {
return nil, err
}

// XXX/demmer handle schemaName?
return proxy.conn.Session("", options), nil
return conn.Session("", options), nil
}

// CloseSession closes the session, rolling back any implicit transactions. This has the
Expand Down Expand Up @@ -128,16 +145,5 @@ func (proxy *VTGateProxy) StreamExecute(ctx context.Context, session *vtgateconn
return nil
}

func Init() error {
// XXX maybe add connect timeout?
ctx, cancel := context.WithTimeout(context.Background(), *dialTimeout)
defer cancel()
err := vtGateProxy.connect(ctx)
if err != nil {
log.Fatalf("error connecting to vtgate: %v", err)
return err
}
log.Infof("Connected to VTGate at %s", *target)

return nil
func Init() {
}

0 comments on commit 97120bd

Please sign in to comment.