Skip to content

Commit

Permalink
etcdmain: add --experimental-serializable-ordering to grpc proxy
Browse files Browse the repository at this point in the history
Connect to another endpoint on stale reads.
  • Loading branch information
Anthony Romano committed Jul 27, 2017
1 parent fca56f1 commit f6acd03
Showing 1 changed file with 21 additions and 1 deletion.
22 changes: 21 additions & 1 deletion etcdmain/grpc_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package etcdmain

import (
"context"
"fmt"
"math"
"net"
Expand All @@ -26,6 +27,7 @@ import (

"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/namespace"
"github.com/coreos/etcd/clientv3/ordering"
"github.com/coreos/etcd/etcdserver/api/etcdhttp"
"github.com/coreos/etcd/etcdserver/api/v3election/v3electionpb"
"github.com/coreos/etcd/etcdserver/api/v3lock/v3lockpb"
Expand Down Expand Up @@ -69,7 +71,8 @@ var (

grpcProxyNamespace string

grpcProxyEnablePprof bool
grpcProxyEnablePprof bool
grpcProxyEnableOrdering bool
)

func init() {
Expand Down Expand Up @@ -119,6 +122,9 @@ func newGRPCProxyStartCommand() *cobra.Command {
cmd.Flags().BoolVar(&grpcProxyListenAutoTLS, "auto-tls", false, "proxy TLS using generated certificates")
cmd.Flags().StringVar(&grpcProxyListenCRL, "client-crl-file", "", "proxy client certificate revocation list file.")

// experimental flags
cmd.Flags().BoolVar(&grpcProxyEnableOrdering, "experimental-serializable-ordering", false, "Ensure serializable reads have monotonically increasing store revisions across endpoints.")

return &cmd
}

Expand Down Expand Up @@ -255,6 +261,20 @@ func mustListenCMux(tlsinfo *transport.TLSInfo) cmux.CMux {
}

func newGRPCProxyServer(client *clientv3.Client) *grpc.Server {
if grpcProxyEnableOrdering {
vf := ordering.NewOrderViolationSwitchEndpointClosure(*client)
client.KV = ordering.NewKV(client.KV, vf)
plog.Infof("waiting for linearized read from cluster to recover ordering")
for {
_, err := client.KV.Get(context.TODO(), "_", clientv3.WithKeysOnly())
if err == nil {
break
}
plog.Warningf("ordering recovery failed, retrying in 1s (%v)", err)
time.Sleep(time.Second)
}
}

if len(grpcProxyNamespace) > 0 {
client.KV = namespace.NewKV(client.KV, grpcProxyNamespace)
client.Watcher = namespace.NewWatcher(client.Watcher, grpcProxyNamespace)
Expand Down

0 comments on commit f6acd03

Please sign in to comment.