-
Notifications
You must be signed in to change notification settings - Fork 9.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
clientv3: add SetEndpoints method #6330
Conversation
eps = append(eps, ep) | ||
} | ||
|
||
c.balancer.mu.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
only balancer methods should lock the balancer so there's a clear division of responsibility
@heyitsanthony @gyuho Shouldnt the first step be making the users be able to change the endpoints? then the auto-mebership-sync would be a wrapper around it? |
@@ -62,6 +63,39 @@ func TestMemberAdd(t *testing.T) { | |||
} | |||
} | |||
|
|||
func TestMemberRemoveWithAutoSync(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is only testing that the client is getting the member list when the important part is really client connectivity. A good test of functionality would be something like:
- boot a cluster with one member
- attach an autosync client
- add two more members
- wait for sync to grab the new members
- stop the original member
- use the client to access the cluster (the rpc will only go through if the sync worked)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok will rework on this. Thanks
1e6fa9f
to
f010129
Compare
clus.Members[0].Stop(t) | ||
clus.WaitLeader(t) | ||
|
||
// rpc will only go through if the sync worked |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will fail because adding more endpoints doesn't update the unavailable pinned endpoint.
Do we expect users to detect the unavailable endpoint and change the pinned one manually? How should we test this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The balancer should automatically choose another endpoint to pin when it detects the old pinned endpoint is down. For a non-mutable RPC like Get or MemberList, the user shouldn't notice the failover.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think our clientv3 is not yet implementing Start(target string)
method of grpc.Balancer
interface?
func (b *simpleBalancer) Start(target string) error { return nil }
So there's no routine that watches unavailable endpoints and changes the pinned one by calling Up(addr)
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The balancer shouldn't call Up
, only grpc does that since that's how it signals to the balancer it established a connection. I don't think implementing Start
will help-- it looks like it's only called on Dial
and the endpoint updates from sync shouldn't trigger the dial path.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right. grpc internally calls Up
via transportMonitor
and resetAddrConn
, but it is not being called in our test case, for some reason. I will look into it more.
67736d0
to
fff2ce4
Compare
@heyitsanthony This needs more work than I thought. Problems with this PR is
Any suggestion? Thanks! |
|
5fd6a0c
to
096ff3b
Compare
addrs = append(addrs, grpc.Address{Addr: getHost(ep)}) | ||
} | ||
b.addrs = addrs | ||
b.notifyCh <- addrs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@heyitsanthony So I am seeing gRPC is being notified of these new addrs
via Notify
and lbWatchers
, but I am still seeing the same error in tests. Do you see anything else that I am doing wrong?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was talking about https://github.com/grpc/grpc-go/blob/master/clientconn.go#L384-L385.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not really familiar with the semantics of the notify channel. I don't think this function should be resetting readyc/upEps/upc/pinAddr though; anything that's waiting on upc
or readyc
will break for sure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I need to read more gRPC part. Will rework on this. Thanks.
096ff3b
to
5ad9f16
Compare
5ad9f16
to
c3c078a
Compare
274c1b1
to
e4d6312
Compare
Believe test failures are not related to this. |
95d5a26
to
4a51719
Compare
@heyitsanthony PTAL. Test failures were related to the changes. Now fixed. Have Thanks. |
95d2a15
to
dfc8c73
Compare
@@ -171,7 +183,8 @@ func (c *Client) dialSetupOpts(endpoint string, dopts ...grpc.DialOption) (opts | |||
} | |||
opts = append(opts, grpc.WithDialer(f)) | |||
|
|||
_, _, creds := c.dialTarget(endpoint) | |||
proto, _, scheme := parseEndpoint(endpoint) | |||
creds := c.processCreds(proto, scheme) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think processCreds
can do without the bool
argument:
creds := c.creds
if proto, _, scheme := parseEndpoint(endpoint); scheme {
creds = c.processCreds(proto)
}
lgtm following |
dfc8c73
to
f11b35e
Compare
Fix #5491.
Related to #5920.