Skip to content
This repository has been archived by the owner on Jan 30, 2020. It is now read-only.

Commit

Permalink
registry/rpc: use simpleBalancer instead of ClientConn.State()
Browse files Browse the repository at this point in the history
As ClientConn.State() of gRPC has disappeared, we need to avoid using
ClientConn.State(). Instead we should make use of gRPC rebalancer
mechanism, just like etcdv3 is doing. To do that, introduce
simpleBalancer, as a minimum structure to be used for grpc.Balancer.
  • Loading branch information
Dongsu Park committed Nov 10, 2016
1 parent 423e390 commit ecb121a
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 46 deletions.
7 changes: 6 additions & 1 deletion engine/rpcengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,12 @@ func rpcAcquireLeadership(reg registry.Registry, lManager lease.Manager, machID
return l
}

if existing != nil && existing.Version() >= ver {
// If reg is not ready, we have to give it an opportunity to steal lease
// below. Otherwise it could be blocked forever by the existing engine leader,
// which could cause gRPC registry to always fail when a leader already exists.
// Thus we return the existing leader, only if reg.IsRegistryReady() == true.
// TODO(dpark): refactor the entire function for better readability. - 20160908
if (existing != nil && existing.Version() >= ver) && reg.IsRegistryReady() {
log.Debugf("Lease already held by Machine(%s) operating at acceptable version %d", existing.MachineID(), existing.Version())
return existing
}
Expand Down
80 changes: 80 additions & 0 deletions registry/rpc/balancer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright 2016 The fleet Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package rpc

import (
"sync"
"sync/atomic"

"golang.org/x/net/context"
"google.golang.org/grpc"
)

// simpleBalancer implements grpc.Balancer interface, being as simple as possible.
// to be used only for fleet.
//
// In principle grpc.Balancer is meant to be handling load balancer across
// multiple connections via addresses for RPCs.
// * Start() does initialization work to bootstrap a Balancer.
// * Up() informs the Balancer that gRPC has a connection to the server at addr.
// It returns Down() which is called once the connection to addr gets lost
// or closed.
// * Get() gets the address of a server for the RPC corresponding to ctx.
// * Notify() returns a channel that is used by gRPC internals to watch the
// addresses gRPC needs to connect.
// * Close() shuts down the balancer.
//
// However, as fleet needs to care only about a single connection, simpleBalancer
// in fleet should be kept as simple as possible. Most crucially simpleBalancer
// provides a simple channel, readyc, to notify the rpcRegistry of the connection
// being available. readyc gets closed in Up(), which will cause, for example,
// IsRegistryReady() to recognize that the connection is available. We don't need
// to care about which value the readyc has.
type simpleBalancer struct {
addrs []string
numGets uint32

// readyc closes once the first connection is up
readyc chan struct{}
readyOnce sync.Once
}

func newSimpleBalancer(eps []string) *simpleBalancer {
return &simpleBalancer{
addrs: eps,
readyc: make(chan struct{}),
}
}

func (b *simpleBalancer) Start(target string) error { return nil }

func (b *simpleBalancer) Up(addr grpc.Address) func(error) {
b.readyOnce.Do(func() { close(b.readyc) })
return func(error) {}
}

func (b *simpleBalancer) Get(ctx context.Context, opts grpc.BalancerGetOptions) (grpc.Address, func(), error) {
v := atomic.AddUint32(&b.numGets, 1)
addr := b.addrs[v%uint32(len(b.addrs))]

return grpc.Address{Addr: addr}, func() {}, nil
}

func (b *simpleBalancer) Notify() <-chan []grpc.Address { return nil }

func (b *simpleBalancer) Close() error {
b.readyc = make(chan struct{})
return nil
}
60 changes: 16 additions & 44 deletions registry/rpc/rpcregistry.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,22 +32,14 @@ import (
"github.com/coreos/fleet/unit"
)

const (
grpcConnectionTimeout = 5000 * time.Millisecond

grpcConnectionStateReady = "READY"
grpcConnectionStateConnecting = "CONNECTING"
grpcConnectionStateShutdown = "SHUTDOWN"
grpcConnectionStateFailure = "TRANSIENT_FAILURE"
)

var DebugRPCRegistry bool = false

type RPCRegistry struct {
dialer func(addr string, timeout time.Duration) (net.Conn, error)
mu *sync.Mutex
registryClient pb.RegistryClient
registryConn *grpc.ClientConn
balancer *simpleBalancer
}

func NewRPCRegistry(dialer func(string, time.Duration) (net.Conn, error)) *RPCRegistry {
Expand All @@ -63,35 +55,17 @@ func (r *RPCRegistry) ctx() context.Context {
}

func (r *RPCRegistry) getClient() pb.RegistryClient {
for {
st, err := r.registryConn.State()
if err != nil {
log.Fatalf("Unable to get the state of rpc connection: %v", err)
}
state := st.String()
if state == grpcConnectionStateReady {
break
} else if state == grpcConnectionStateConnecting {
if DebugRPCRegistry {
log.Infof("gRPC connection state: %s", state)
}
continue
} else if state == grpcConnectionStateFailure || state == grpcConnectionStateShutdown {
log.Infof("gRPC connection state '%s' reports an error in the connection", state)
log.Info("Reconnecting gRPC peer to fleet-engine...")
r.Connect()
}

time.Sleep(grpcConnectionTimeout)
}

return r.registryClient
}

func (r *RPCRegistry) Connect() {
// We want the connection operation to block and constantly reconnect using grpc backoff
log.Info("Starting gRPC connection to fleet-engine...")
connection, err := grpc.Dial(":fleet-engine:", grpc.WithTimeout(12*time.Second), grpc.WithInsecure(), grpc.WithDialer(r.dialer), grpc.WithBlock())
ep_engines := []string{":fleet-engine:"}
r.balancer = newSimpleBalancer(ep_engines)
connection, err := grpc.Dial(ep_engines[0],
grpc.WithTimeout(12*time.Second), grpc.WithInsecure(),
grpc.WithDialer(r.dialer), grpc.WithBlock(), grpc.WithBalancer(r.balancer))
if err != nil {
log.Fatalf("Unable to dial to registry: %s", err)
}
Expand All @@ -106,24 +80,22 @@ func (r *RPCRegistry) Close() {

func (r *RPCRegistry) IsRegistryReady() bool {
if r.registryConn != nil {
st, err := r.registryConn.State()
if err != nil {
log.Fatalf("Unable to get the state of rpc connection: %v", err)
}
connState := st.String()
log.Infof("Registry connection state: %s", connState)
if connState != grpcConnectionStateReady {
log.Errorf("unable to connect to registry connection state: %s", connState)
return false
hasConn := false
if r.balancer != nil {
select {
case <-r.balancer.readyc:
hasConn = true
}
}
log.Infof("Getting server status...")

status, err := r.Status()
if err != nil {
log.Errorf("unable to get the status of the registry service %v", err)
return false
}
log.Infof("Status of rpc service: %d, connection state: %s", status, connState)
return status == pb.HealthCheckResponse_SERVING && err == nil
log.Infof("Status of rpc service: %d, balancer has a connection: %t", status, hasConn)

return hasConn && status == pb.HealthCheckResponse_SERVING && err == nil
}
return false
}
Expand Down
4 changes: 3 additions & 1 deletion registry/rpc/rpcregistry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ func TestRPCRegistryClientCreation(t *testing.T) {
t.Fatalf("failed to parse listener address: %v", err)
}
addr := "localhost:" + port
conn, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithTimeout(5*time.Second), grpc.WithBlock())
b := newSimpleBalancer([]string{addr})
conn, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithTimeout(5*time.Second),
grpc.WithBlock(), grpc.WithBalancer(b))
if err != nil {
t.Fatalf("failed to dial to the server %q: %v", addr, err)
}
Expand Down

0 comments on commit ecb121a

Please sign in to comment.