diff --git a/etcdmain/grpc_proxy.go b/etcdmain/grpc_proxy.go index 4cc7910e164..bee6755ff64 100644 --- a/etcdmain/grpc_proxy.go +++ b/etcdmain/grpc_proxy.go @@ -15,6 +15,7 @@ package etcdmain import ( + "context" "fmt" "math" "net" @@ -26,6 +27,7 @@ import ( "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/clientv3/namespace" + "github.com/coreos/etcd/clientv3/ordering" "github.com/coreos/etcd/etcdserver/api/etcdhttp" "github.com/coreos/etcd/etcdserver/api/v3election/v3electionpb" "github.com/coreos/etcd/etcdserver/api/v3lock/v3lockpb" @@ -69,7 +71,8 @@ var ( grpcProxyNamespace string - grpcProxyEnablePprof bool + grpcProxyEnablePprof bool + grpcProxyEnableOrdering bool ) func init() { @@ -119,6 +122,9 @@ func newGRPCProxyStartCommand() *cobra.Command { cmd.Flags().BoolVar(&grpcProxyListenAutoTLS, "auto-tls", false, "proxy TLS using generated certificates") cmd.Flags().StringVar(&grpcProxyListenCRL, "client-crl-file", "", "proxy client certificate revocation list file.") + // experimental flags + cmd.Flags().BoolVar(&grpcProxyEnableOrdering, "experimental-serializable-ordering", false, "Ensure serializable reads have monotonically increasing store revisions across endpoints.") + return &cmd } @@ -255,6 +261,20 @@ func mustListenCMux(tlsinfo *transport.TLSInfo) cmux.CMux { } func newGRPCProxyServer(client *clientv3.Client) *grpc.Server { + if grpcProxyEnableOrdering { + vf := ordering.NewOrderViolationSwitchEndpointClosure(*client) + client.KV = ordering.NewKV(client.KV, vf) + plog.Infof("waiting for linearized read from cluster to recover ordering") + for { + _, err := client.KV.Get(context.TODO(), "_", clientv3.WithKeysOnly()) + if err == nil { + break + } + plog.Warningf("ordering recovery failed, retrying in 1s (%v)", err) + time.Sleep(time.Second) + } + } + if len(grpcProxyNamespace) > 0 { client.KV = namespace.NewKV(client.KV, grpcProxyNamespace) client.Watcher = namespace.NewWatcher(client.Watcher, grpcProxyNamespace)