Skip to content

Commit

Permalink
Merge branch 'master' into proxy_pr2
Browse files Browse the repository at this point in the history
  • Loading branch information
eshitachandwani authored Dec 23, 2024
2 parents 5ce4658 + 063d352 commit a292b63
Show file tree
Hide file tree
Showing 203 changed files with 6,545 additions and 2,954 deletions.
104 changes: 0 additions & 104 deletions balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,17 +73,6 @@ func unregisterForTesting(name string) {
delete(m, name)
}

// connectedAddress returns the connected address for a SubConnState. The
// address is only valid if the state is READY.
func connectedAddress(scs SubConnState) resolver.Address {
return scs.connectedAddress
}

// setConnectedAddress sets the connected address for a SubConnState.
func setConnectedAddress(scs *SubConnState, addr resolver.Address) {
scs.connectedAddress = addr
}

func init() {
internal.BalancerUnregister = unregisterForTesting
internal.ConnectedAddress = connectedAddress
Expand All @@ -106,68 +95,6 @@ func Get(name string) Builder {
return nil
}

// A SubConn represents a single connection to a gRPC backend service.
//
// Each SubConn contains a list of addresses.
//
// All SubConns start in IDLE, and will not try to connect. To trigger the
// connecting, Balancers must call Connect. If a connection re-enters IDLE,
// Balancers must call Connect again to trigger a new connection attempt.
//
// gRPC will try to connect to the addresses in sequence, and stop trying the
// remainder once the first connection is successful. If an attempt to connect
// to all addresses encounters an error, the SubConn will enter
// TRANSIENT_FAILURE for a backoff period, and then transition to IDLE.
//
// Once established, if a connection is lost, the SubConn will transition
// directly to IDLE.
//
// This interface is to be implemented by gRPC. Users should not need their own
// implementation of this interface. For situations like testing, any
// implementations should embed this interface. This allows gRPC to add new
// methods to this interface.
type SubConn interface {
// UpdateAddresses updates the addresses used in this SubConn.
// gRPC checks if currently-connected address is still in the new list.
// If it's in the list, the connection will be kept.
// If it's not in the list, the connection will gracefully close, and
// a new connection will be created.
//
// This will trigger a state transition for the SubConn.
//
// Deprecated: this method will be removed. Create new SubConns for new
// addresses instead.
UpdateAddresses([]resolver.Address)
// Connect starts the connecting for this SubConn.
Connect()
// GetOrBuildProducer returns a reference to the existing Producer for this
// ProducerBuilder in this SubConn, or, if one does not currently exist,
// creates a new one and returns it. Returns a close function which may be
// called when the Producer is no longer needed. Otherwise the producer
// will automatically be closed upon connection loss or subchannel close.
// Should only be called on a SubConn in state Ready. Otherwise the
// producer will be unable to create streams.
GetOrBuildProducer(ProducerBuilder) (p Producer, close func())
// Shutdown shuts down the SubConn gracefully. Any started RPCs will be
// allowed to complete. No future calls should be made on the SubConn.
// One final state update will be delivered to the StateListener (or
// UpdateSubConnState; deprecated) with ConnectivityState of Shutdown to
// indicate the shutdown operation. This may be delivered before
// in-progress RPCs are complete and the actual connection is closed.
Shutdown()
// RegisterHealthListener registers a health listener that receives health
// updates for a Ready SubConn. Only one health listener can be registered
// at a time. A health listener should be registered each time the SubConn's
// connectivity state changes to READY. Registering a health listener when
// the connectivity state is not READY may result in undefined behaviour.
// This method must not be called synchronously while handling an update
// from a previously registered health listener.
RegisterHealthListener(func(SubConnState))
// enforceEmbedding is an unexported method to force implementers embed
// this interface, allowing gRPC to add methods without breaking users.
enforceEmbedding()
}

// NewSubConnOptions contains options to create new SubConn.
type NewSubConnOptions struct {
// CredsBundle is the credentials bundle that will be used in the created
Expand Down Expand Up @@ -435,18 +362,6 @@ type ExitIdler interface {
ExitIdle()
}

// SubConnState describes the state of a SubConn.
type SubConnState struct {
// ConnectivityState is the connectivity state of the SubConn.
ConnectivityState connectivity.State
// ConnectionError is set if the ConnectivityState is TransientFailure,
// describing the reason the SubConn failed. Otherwise, it is nil.
ConnectionError error
// connectedAddr contains the connected address when ConnectivityState is
// Ready. Otherwise, it is indeterminate.
connectedAddress resolver.Address
}

// ClientConnState describes the state of a ClientConn relevant to the
// balancer.
type ClientConnState struct {
Expand All @@ -459,22 +374,3 @@ type ClientConnState struct {
// ErrBadResolverState may be returned by UpdateClientConnState to indicate a
// problem with the provided name resolver data.
var ErrBadResolverState = errors.New("bad resolver state")

// A ProducerBuilder is a simple constructor for a Producer. It is used by the
// SubConn to create producers when needed.
type ProducerBuilder interface {
// Build creates a Producer. The first parameter is always a
// grpc.ClientConnInterface (a type to allow creating RPCs/streams on the
// associated SubConn), but is declared as `any` to avoid a dependency
// cycle. Build also returns a close function that will be called when all
// references to the Producer have been given up for a SubConn, or when a
// connectivity state change occurs on the SubConn. The close function
// should always block until all asynchronous cleanup work is completed.
Build(grpcClientConnInterface any) (p Producer, close func())
}

// A Producer is a type shared among potentially many consumers. It is
// associated with a SubConn, and an implementation will typically contain
// other methods to provide additional functionality, e.g. configuration or
// subscription registration.
type Producer any
21 changes: 13 additions & 8 deletions balancer/endpointsharding/endpointsharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,27 @@ package endpointsharding
import (
"encoding/json"
"errors"
"fmt"
rand "math/rand/v2"
"sync"
"sync/atomic"

rand "math/rand/v2"

"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/base"
"google.golang.org/grpc/balancer/pickfirst/pickfirstleaf"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/balancer/gracefulswitch"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
)

// PickFirstConfig is a pick first config without shuffling enabled.
var PickFirstConfig string

func init() {
PickFirstConfig = fmt.Sprintf("[{%q: {}}]", pickfirstleaf.Name)
}

// ChildState is the balancer state of a child along with the endpoint which
// identifies the child balancer.
type ChildState struct {
Expand Down Expand Up @@ -100,9 +108,6 @@ func (es *endpointSharding) UpdateClientConnState(state balancer.ClientConnState

// Update/Create new children.
for _, endpoint := range state.ResolverState.Endpoints {
if len(endpoint.Addresses) == 0 {
continue
}
if _, ok := newChildren.Get(endpoint); ok {
// Endpoint child was already created, continue to avoid duplicate
// update.
Expand Down Expand Up @@ -143,6 +148,9 @@ func (es *endpointSharding) UpdateClientConnState(state balancer.ClientConnState
}
}
es.children.Store(newChildren)
if newChildren.Len() == 0 {
return balancer.ErrBadResolverState
}
return ret
}

Expand Down Expand Up @@ -306,6 +314,3 @@ func (bw *balancerWrapper) UpdateState(state balancer.State) {
func ParseConfig(cfg json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
return gracefulswitch.ParseConfig(cfg)
}

// PickFirstConfig is a pick first config without shuffling enabled.
const PickFirstConfig = "[{\"pick_first\": {}}]"
2 changes: 1 addition & 1 deletion balancer/grpclb/grpc_lb_v1/load_balancer.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

39 changes: 21 additions & 18 deletions balancer/grpclb/grpclb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ func (s) TestGRPCLB_Basic(t *testing.T) {
}
cc, err := grpc.NewClient(r.Scheme()+":///"+beServerName, dopts...)
if err != nil {
t.Fatalf("Failed to dial to the backend %v", err)
t.Fatalf("Failed to create a client for the backend %v", err)
}
defer cc.Close()

Expand Down Expand Up @@ -517,7 +517,7 @@ func (s) TestGRPCLB_Weighted(t *testing.T) {
}
cc, err := grpc.NewClient(r.Scheme()+":///"+beServerName, dopts...)
if err != nil {
t.Fatalf("Failed to dial to the backend %v", err)
t.Fatalf("Failed to create a client for the backend %v", err)
}
defer cc.Close()

Expand Down Expand Up @@ -597,7 +597,7 @@ func (s) TestGRPCLB_DropRequest(t *testing.T) {
}
cc, err := grpc.NewClient(r.Scheme()+":///"+beServerName, dopts...)
if err != nil {
t.Fatalf("Failed to dial to the backend %v", err)
t.Fatalf("Failed to create a client for the backend %v", err)
}
defer cc.Close()
testC := testgrpc.NewTestServiceClient(cc)
Expand Down Expand Up @@ -769,7 +769,7 @@ func (s) TestGRPCLB_BalancerDisconnects(t *testing.T) {
}
cc, err := grpc.NewClient(r.Scheme()+":///"+beServerName, dopts...)
if err != nil {
t.Fatalf("Failed to dial to the backend %v", err)
t.Fatalf("Failed to create a client for the backend %v", err)
}
defer cc.Close()
testC := testgrpc.NewTestServiceClient(cc)
Expand Down Expand Up @@ -940,7 +940,7 @@ func (s) TestGRPCLB_ExplicitFallback(t *testing.T) {
}
cc, err := grpc.NewClient(r.Scheme()+":///"+beServerName, dopts...)
if err != nil {
t.Fatalf("Failed to dial to the backend %v", err)
t.Fatalf("Failed to create a client for the backend %v", err)
}
defer cc.Close()
testC := testgrpc.NewTestServiceClient(cc)
Expand Down Expand Up @@ -1008,11 +1008,12 @@ func (s) TestGRPCLB_FallBackWithNoServerAddress(t *testing.T) {
grpc.WithTransportCredentials(&serverNameCheckCreds{}),
grpc.WithContextDialer(fakeNameDialer),
}
cc, err := grpc.Dial(r.Scheme()+":///"+beServerName, dopts...)
cc, err := grpc.NewClient(r.Scheme()+":///"+beServerName, dopts...)
if err != nil {
t.Fatalf("Failed to dial to the backend %v", err)
t.Fatalf("Failed to create a client for the backend %v", err)
}
defer cc.Close()
cc.Connect()
testC := testgrpc.NewTestServiceClient(cc)

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
Expand Down Expand Up @@ -1102,10 +1103,11 @@ func (s) TestGRPCLB_PickFirst(t *testing.T) {
grpc.WithTransportCredentials(&serverNameCheckCreds{}),
grpc.WithContextDialer(fakeNameDialer),
}
cc, err := grpc.Dial(r.Scheme()+":///"+beServerName, dopts...)
cc, err := grpc.NewClient(r.Scheme()+":///"+beServerName, dopts...)
if err != nil {
t.Fatalf("Failed to dial to the backend %v", err)
t.Fatalf("Failed to create a client for the backend: %v", err)
}
cc.Connect()
defer cc.Close()

// Push a service config with grpclb as the load balancing policy and
Expand Down Expand Up @@ -1198,7 +1200,7 @@ func (s) TestGRPCLB_BackendConnectionErrorPropagation(t *testing.T) {
grpc.WithTransportCredentials(&serverNameCheckCreds{}),
grpc.WithContextDialer(fakeNameDialer))
if err != nil {
t.Fatalf("Failed to create new client to the backend %v", err)
t.Fatalf("Failed to create a client for the backend: %v", err)
}
defer cc.Close()
testC := testgrpc.NewTestServiceClient(cc)
Expand Down Expand Up @@ -1241,10 +1243,11 @@ func testGRPCLBEmptyServerList(t *testing.T, svcfg string) {
grpc.WithTransportCredentials(&serverNameCheckCreds{}),
grpc.WithContextDialer(fakeNameDialer),
}
cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName, dopts...)
cc, err := grpc.NewClient(r.Scheme()+":///"+beServerName, dopts...)
if err != nil {
t.Fatalf("Failed to dial to the backend %v", err)
t.Fatalf("Failed to create a client for the backend %v", err)
}
cc.Connect()
defer cc.Close()
testC := testgrpc.NewTestServiceClient(cc)

Expand Down Expand Up @@ -1311,15 +1314,16 @@ func (s) TestGRPCLBWithTargetNameFieldInConfig(t *testing.T) {
// Push the backend address to the remote balancer.
tss.ls.sls <- sl

cc, err := grpc.Dial(r.Scheme()+":///"+beServerName,
cc, err := grpc.NewClient(r.Scheme()+":///"+beServerName,
grpc.WithResolvers(r),
grpc.WithTransportCredentials(&serverNameCheckCreds{}),
grpc.WithContextDialer(fakeNameDialer),
grpc.WithUserAgent(testUserAgent))
if err != nil {
t.Fatalf("Failed to dial to the backend %v", err)
t.Fatalf("Failed to create a client for the backend %v", err)
}
defer cc.Close()
cc.Connect()
testC := testgrpc.NewTestServiceClient(cc)

// Push a resolver update with grpclb configuration which does not contain the
Expand Down Expand Up @@ -1418,15 +1422,14 @@ func runAndCheckStats(t *testing.T, drop bool, statsChan chan *lbpb.ClientStats,
tss.ls.statsDura = 100 * time.Millisecond
creds := serverNameCheckCreds{}

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName, grpc.WithResolvers(r),
cc, err := grpc.NewClient(r.Scheme()+":///"+beServerName, grpc.WithResolvers(r),
grpc.WithTransportCredentials(&creds),
grpc.WithPerRPCCredentials(failPreRPCCred{}),
grpc.WithContextDialer(fakeNameDialer))
if err != nil {
t.Fatalf("Failed to dial to the backend %v", err)
t.Fatalf("Failed to create a client for the backend %v", err)
}
cc.Connect()
defer cc.Close()

rstate := resolver.State{ServiceConfig: r.CC.ParseServiceConfig(grpclbConfig)}
Expand Down
18 changes: 9 additions & 9 deletions balancer/pickfirst/pickfirst_ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,12 +376,12 @@ func (s) TestPickFirst_StickyTransientFailure(t *testing.T) {
},
}),
}
cc, err := grpc.Dial(lis.Addr().String(), dopts...)
cc, err := grpc.NewClient(lis.Addr().String(), dopts...)
if err != nil {
t.Fatalf("Failed to dial server at %q: %v", lis.Addr(), err)
t.Fatalf("Failed to create new client: %v", err)
}
t.Cleanup(func() { cc.Close() })

cc.Connect()
testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure)

// Spawn a goroutine to ensure that the channel stays in TransientFailure.
Expand Down Expand Up @@ -837,12 +837,12 @@ func (s) TestPickFirst_ResolverError_WithPreviousUpdate_Connecting(t *testing.T)
grpc.WithResolvers(r),
grpc.WithDefaultServiceConfig(pickFirstServiceConfig),
}
cc, err := grpc.Dial(r.Scheme()+":///test.server", dopts...)
cc, err := grpc.NewClient(r.Scheme()+":///test.server", dopts...)
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
t.Fatalf("grpc.NewClient() failed: %v", err)
}
t.Cleanup(func() { cc.Close() })

cc.Connect()
addrs := []resolver.Address{{Addr: lis.Addr().String()}}
r.UpdateState(resolver.State{Addresses: addrs})
testutils.AwaitState(ctx, t, cc, connectivity.Connecting)
Expand Down Expand Up @@ -892,12 +892,12 @@ func (s) TestPickFirst_ResolverError_WithPreviousUpdate_TransientFailure(t *test
grpc.WithResolvers(r),
grpc.WithDefaultServiceConfig(pickFirstServiceConfig),
}
cc, err := grpc.Dial(r.Scheme()+":///test.server", dopts...)
cc, err := grpc.NewClient(r.Scheme()+":///test.server", dopts...)
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
t.Fatalf("grpc.NewClient() failed: %v", err)
}
t.Cleanup(func() { cc.Close() })

cc.Connect()
addrs := []resolver.Address{{Addr: lis.Addr().String()}}
r.UpdateState(resolver.State{Addresses: addrs})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
Expand Down
Loading

0 comments on commit a292b63

Please sign in to comment.