Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add meta retry #94

Merged
merged 1 commit into from
Mar 16, 2022
Merged

add meta retry #94

merged 1 commit into from
Mar 16, 2022

Conversation

kqzh
Copy link
Contributor

@kqzh kqzh commented Mar 15, 2022

No description provided.

@kqzh kqzh requested a review from veezhang March 15, 2022 11:45
@kqzh kqzh force-pushed the fix_change_leader branch 2 times, most recently from 0e632fd to 588cb61 Compare March 16, 2022 03:56
Comment on lines +76 to +79
Spaces interface {
MetaBaser
GetSpaces() []Space
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about?

Spaces struct{
    MetaBaser
    spaces []Space
}

Comment on lines -120 to -123
if d.MetaClientDriver != nil {
return nil
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why remove?

"fmt"
"github.com/facebook/fbthrift/thrift/lib/go/thrift"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sort it?

Comment on lines 54 to 72
func (c *defaultMetaClient) AddHosts(endpoints []string) (types.MetaBaser, error) {
resp, err := c.meta.AddHosts(endpoints)
if err != nil {
if err = c.reconnect(err); err != nil {
return nil, err
}
return c.AddHosts(endpoints)
}

// check if leader changed
if resp.GetCode() == nerrors.ErrorCode_E_LEADER_CHANGED {
if err = c.updateConnection(resp.GetLeader()); err != nil {
return nil, err
}

return c.AddHosts(endpoints)
}

return resp, nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

func (c *defaultMetaClient) BalanceLeader(space string) (resp types.Balancer, err error) {
	c.retryDo(func() (types.MetaBaser, error) {
		resp, err = c.meta.Balance(types.BalanceReq{
			Cmd:   types.BalanceLeader,
			Space: space,
		})
		return resp, err
	})
	return
}

func (c *defaultMetaClient) retryDo(fn func() (types.MetaBaser, error)) {
	resp, err := fn()
	if err != nil {
		if err = c.reconnect(err); err != nil {
			return
		}
		resp, err = fn()
		if err != nil {
			return
		}
	}

	if resp.GetCode() == nerrors.ErrorCode_E_LEADER_CHANGED {
		if err = c.updateConnection(resp.GetLeader()); err != nil {
			return
		}
		_, _ = fn()
	}
}

}

func (c *defaultMetaClient) defaultClient() *defaultClient {
return (*defaultClient)(c)
}

func (c *defaultMetaClient) updateConnection(endpoint string) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

func (c *defaultMetaClient) updateConnection(endpoint string) error {
	if ok := c.meta.connection.SetEndpointExists(endpoint); ok {
		return c.meta.open(c.driver)
	}
	return error //
}

return c.meta.open(c.driver)
}

func (c *defaultMetaClient) reconnect(err error) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

func (c *defaultMetaClient) reconnect(err error) error {
	if _, ok := err.(thrift.TransportException); !ok {
		return err
	}
	return c.openRetry(c.driver)
}

func (c *defaultMetaClient) openRetry(driver types.Driver) error {
	for i := 0; i < c.meta.connection.GetEndpointLen(); i++ {
		c.meta.connection.UpdateNextEndpoint()
		if err := c.meta.open(driver); err != nil {
			if _, ok := err.(thrift.TransportException); !ok {
			}
			return err
		}
	}
	return nil //error
}

@@ -34,6 +34,7 @@ type (
o *socketOptions
mu sync.Mutex
endpoints []string
index int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use nextIndex ?

func (c *connectionMu) UpdateNextEndpoint() {
	c.mu.Lock()
	defer c.mu.Unlock()
	c.nextIndex = (c.nextIndex + 1) % c.GetEndpointLen()
}

func (c *connectionMu) SetEndpointExists(endpoint string) bool {
	c.mu.Lock()
	defer c.mu.Unlock()
	for i, ep := range c.endpoints {
		if ep == endpoint {
			return true
		}
	}
	return false
}

func (c *connectionMu) GetEndpointLen() int {
	return len(c.endpoints)
}

@@ -31,48 +35,188 @@ func NewMetaClient(endpoints []string, opts ...Option) (MetaClient, error) {

func (c *defaultMetaClient) Open() error {
return c.defaultClient().initDriver(func(driver types.Driver) error {
return c.meta.open(driver)
n := len(c.meta.connection.endpoints)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return c.openRetry(driver)

@kqzh kqzh force-pushed the fix_change_leader branch 2 times, most recently from 2fd1494 to fd33c4a Compare March 16, 2022 09:24
})

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove it?

}
// check if leader change
if resp.GetCode() == nerrors.ErrorCode_E_LEADER_CHANGED {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove it?

if err = c.updateLeader(resp.GetLeader()); err != nil {
return err
}
_, _ = fn()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why ignore this error, but do not ignore it in line 144?

func (c *defaultMetaClient) openRetry(driver types.Driver) error {
n := c.meta.connection.GetEndpointsLen()
for i := 0; i < n; i++ {
c.meta.close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about use _ = c.meta.close() ?

if err := c.meta.open(driver); err == nil {
return nil
}
c.meta.connection.UpdateNextIndex()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's need to update the next index when c.meta.open successfully?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if c.meta.open success, will return nil

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But the next time it fails, it will retry the one that has already failed.

@kqzh kqzh force-pushed the fix_change_leader branch from fd33c4a to 7a38f55 Compare March 16, 2022 10:11
@kqzh kqzh force-pushed the fix_change_leader branch from 7a38f55 to e27892e Compare March 16, 2022 10:42
Copy link
Contributor

@veezhang veezhang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@veezhang veezhang merged commit 3e7907d into vesoft-inc:master Mar 16, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants