Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add meta retry #94

Merged
merged 1 commit into from
Mar 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion ccore/nebula/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ func (c *defaultClient) initDriver(checkFn func(types.Driver) error) error {
c.driver = driver
return nil
}

for _, v := range c.o.autoVersions {
driver, err := types.GetDriver(v)
if err != nil {
Expand Down
148 changes: 126 additions & 22 deletions ccore/nebula/client_meta.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
package nebula

import (
"github.com/facebook/fbthrift/thrift/lib/go/thrift"

nerrors "github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/errors"
"github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/types"
)

type (
MetaClient interface {
Open() error
AddHosts(endpoints []string) error
DropHosts(endpoints []string) error
AddHosts(endpoints []string) (types.MetaBaser, error)
DropHosts(endpoints []string) (types.MetaBaser, error)
ListSpaces() (types.Spaces, error)
BalanceData(space string) (types.Balancer, error)
BalanceLeader(space string) (types.Balancer, error)
Expand All @@ -31,48 +34,149 @@ func NewMetaClient(endpoints []string, opts ...Option) (MetaClient, error) {

func (c *defaultMetaClient) Open() error {
return c.defaultClient().initDriver(func(driver types.Driver) error {
return c.meta.open(driver)
return c.openRetry(driver)
})
}

func (c *defaultMetaClient) Close() error {
return c.meta.close()
}

func (c *defaultMetaClient) AddHosts(endpoints []string) error {
return c.meta.AddHosts(endpoints)
func (c *defaultMetaClient) AddHosts(endpoints []string) (resp types.MetaBaser, err error) {
retryErr := c.retryDo(func() (types.MetaBaser, error) {
resp, err = c.meta.AddHosts(endpoints)
return resp, err
})
if retryErr != nil {
return nil, retryErr
}

return
}

func (c *defaultMetaClient) DropHosts(endpoints []string) error {
return c.meta.DropHosts(endpoints)
func (c *defaultMetaClient) DropHosts(endpoints []string) (resp types.MetaBaser, err error) {
retryErr := c.retryDo(func() (types.MetaBaser, error) {
resp, err = c.meta.DropHosts(endpoints)
return resp, err
})
if retryErr != nil {
return nil, retryErr
}

return
}

func (c *defaultMetaClient) ListSpaces() (types.Spaces, error) {
return c.meta.ListSpaces()
func (c *defaultMetaClient) ListSpaces() (resp types.Spaces, err error) {
retryErr := c.retryDo(func() (types.MetaBaser, error) {
resp, err = c.meta.ListSpaces()
return resp, err
})
if retryErr != nil {
return nil, retryErr
}

return
}

func (c *defaultMetaClient) BalanceData(space string) (types.Balancer, error) {
return c.meta.Balance(types.BalanceReq{
Cmd: types.BalanceData,
Space: space,
func (c *defaultMetaClient) BalanceData(space string) (resp types.Balancer, err error) {
retryErr := c.retryDo(func() (types.MetaBaser, error) {
resp, err = c.meta.Balance(types.BalanceReq{
Cmd: types.BalanceData,
Space: space,
})
return resp, err
})
if retryErr != nil {
return nil, retryErr
}

return
}

func (c *defaultMetaClient) BalanceLeader(space string) (types.Balancer, error) {
return c.meta.Balance(types.BalanceReq{
Cmd: types.BalanceLeader,
Space: space,
func (c *defaultMetaClient) BalanceLeader(space string) (resp types.Balancer, err error) {
retryErr := c.retryDo(func() (types.MetaBaser, error) {
resp, err = c.meta.Balance(types.BalanceReq{
Cmd: types.BalanceLeader,
Space: space,
})
return resp, err
})
if retryErr != nil {
return nil, retryErr
}

return
}

func (c *defaultMetaClient) BalanceDataRemove(space string, endpoints []string) (types.Balancer, error) {
return c.meta.Balance(types.BalanceReq{
Cmd: types.BalanceDataRemove,
Space: space,
HostsToRemove: endpoints,
func (c *defaultMetaClient) BalanceDataRemove(space string, endpoints []string) (resp types.Balancer, err error) {
retryErr := c.retryDo(func() (types.MetaBaser, error) {
resp, err = c.meta.Balance(types.BalanceReq{
Cmd: types.BalanceDataRemove,
Space: space,
HostsToRemove: endpoints,
})
return resp, err
})

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove it?

if retryErr != nil {
return nil, retryErr
}

return
}

func (c *defaultMetaClient) defaultClient() *defaultClient {
return (*defaultClient)(c)
}

func (c *defaultMetaClient) retryDo(fn func() (types.MetaBaser, error)) error {
resp, err := fn()
if err != nil {
// check if transport exception
if err = c.reconnect(err); err != nil {
return err
}
resp, err = fn()
if err != nil {
return err
}
}
// check if leader change
if resp.GetCode() == nerrors.ErrorCode_E_LEADER_CHANGED {
if err = c.updateLeader(resp.GetLeader()); err != nil {
return err
}
if _, err = fn(); err != nil {
return err
}
}
return nil
}

func (c *defaultMetaClient) updateLeader(endpoint string) error {
if err := c.meta.connection.SetEndpointIfExists(endpoint); err != nil {
return err
}
return c.openRetry(c.driver)
}

func (c *defaultMetaClient) reconnect(err error) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

func (c *defaultMetaClient) reconnect(err error) error {
	if _, ok := err.(thrift.TransportException); !ok {
		return err
	}
	return c.openRetry(c.driver)
}

func (c *defaultMetaClient) openRetry(driver types.Driver) error {
	for i := 0; i < c.meta.connection.GetEndpointLen(); i++ {
		c.meta.connection.UpdateNextEndpoint()
		if err := c.meta.open(driver); err != nil {
			if _, ok := err.(thrift.TransportException); !ok {
			}
			return err
		}
	}
	return nil //error
}

if _, ok := err.(thrift.TransportException); !ok {
return err
}
return c.openRetry(c.driver)
}

func (c *defaultMetaClient) openRetry(driver types.Driver) error {
n := c.meta.connection.GetEndpointsLen()
for i := 0; i < n; i++ {
_ = c.meta.close()

err := c.meta.open(driver)
c.meta.connection.UpdateNextIndex() // update nextIndex every time
if err == nil {
return nil
}
}
return nerrors.ErrNoValidMetaEndpoint
}
31 changes: 25 additions & 6 deletions ccore/nebula/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type (
o *socketOptions
mu sync.Mutex
endpoints []string
nextIndex int
}

driverFactory struct {
Expand Down Expand Up @@ -117,10 +118,6 @@ func (d *driverGraph) close() error {
}

func (d *driverMeta) open(driver types.Driver) error {
if d.MetaClientDriver != nil {
return nil
}

Comment on lines -120 to -123
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why remove?

transport, pf, err := d.connection.connect()
if err != nil {
return err
Expand Down Expand Up @@ -179,11 +176,11 @@ func (d *driverStorageAdmin) close() error {
}

func (c *connectionMu) connect() (thrift.Transport, thrift.ProtocolFactory, error) {
// TODO: automatically open until success, only the first endpoints is supported now.
if len(c.endpoints) == 0 {
return nil, nil, nerrors.ErrNoEndpoints
}
return c.buildThriftTransport(c.endpoints[0])

return c.buildThriftTransport(c.endpoints[c.nextIndex])
}

func (c *connectionMu) buildThriftTransport(endpoint string) (thrift.Transport, thrift.ProtocolFactory, error) {
Expand All @@ -203,3 +200,25 @@ func (c *connectionMu) buildThriftTransport(endpoint string) (thrift.Transport,

return transport, pf, nil
}

func (c *connectionMu) UpdateNextIndex() {
c.mu.Lock()
defer c.mu.Unlock()
c.nextIndex = (c.nextIndex + 1) % c.GetEndpointsLen()
}

func (c *connectionMu) SetEndpointIfExists(endpoint string) error {
c.mu.Lock()
defer c.mu.Unlock()
for i, ep := range c.endpoints {
if ep == endpoint {
c.nextIndex = i
return nil
}
}
return nerrors.ErrUnknownMetaEndpoint
}

func (c *connectionMu) GetEndpointsLen() int {
return len(c.endpoints)
}
10 changes: 6 additions & 4 deletions ccore/nebula/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package errors
import "errors"

var (
ErrUnsupportedVersion = errors.New("unsupported version")
ErrUnsupported = errors.New("unsupported")
ErrNoEndpoints = errors.New("no endpoints")
ErrNoJobStats = errors.New("no job stats")
ErrUnsupportedVersion = errors.New("unsupported version")
ErrUnsupported = errors.New("unsupported")
ErrNoEndpoints = errors.New("no endpoints")
ErrNoJobStats = errors.New("no job stats")
ErrUnknownMetaEndpoint = errors.New("unknown meta endpoint to update connection")
ErrNoValidMetaEndpoint = errors.New("no valid meta endpoint to connect")
)
10 changes: 5 additions & 5 deletions ccore/nebula/internal/driver/v2_5/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ func (c *defaultMetaClient) Close() error {
return nil
}

func (c *defaultMetaClient) AddHosts(endpoints []string) error {
return nerrors.ErrUnsupported
func (c *defaultMetaClient) AddHosts(endpoints []string) (types.MetaBaser, error) {
return nil, nerrors.ErrUnsupported
}

func (c *defaultMetaClient) DropHosts(endpoints []string) error {
return nerrors.ErrUnsupported
func (c *defaultMetaClient) DropHosts(endpoints []string) (types.MetaBaser, error) {
return nil, nerrors.ErrUnsupported
}

func (c *defaultMetaClient) ListSpaces() (types.Spaces, error) {
Expand All @@ -62,7 +62,7 @@ func (c *defaultMetaClient) ListSpaces() (types.Spaces, error) {
return nil, err
}

return newSpacesWrapper(resp.Spaces), nil
return newSpacesWrapper(resp), nil
}

func (c *defaultMetaClient) Balance(req types.BalanceReq) (types.Balancer, error) {
Expand Down
47 changes: 40 additions & 7 deletions ccore/nebula/internal/driver/v2_5/wrapper.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package v2_5

import (
"fmt"

nerrors "github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/errors"
nthrift "github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/internal/thrift/v2_5"
"github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/internal/thrift/v2_5/graph"
Expand Down Expand Up @@ -1142,14 +1144,45 @@ type spaceWrapper struct {
Space *meta.IdName
}

func newSpacesWrapper(spaces []*meta.IdName) types.Spaces {
s := make([]types.Space, 0, len(spaces))
for _, space := range spaces {
s = append(s, spaceWrapper{Space: space})
func (w spaceWrapper) GetName() string {
return string(w.Space.GetName())
}

type spacesWrap struct {
metaBaserWrap
Spaces []types.Space
}

func (w spacesWrap) GetSpaces() []types.Space {
return w.Spaces
}

func newSpacesWrapper(resp *meta.ListSpacesResp) types.Spaces {
list := make([]types.Space, 0, len(resp.GetSpaces()))
for _, space := range resp.GetSpaces() {
list = append(list, spaceWrapper{Space: space})
}
return spacesWrap{
metaBaserWrap: metaBaserWrap{
code: nerrors.ErrorCode(resp.GetCode()),
leader: types.HostAddr{
Host: resp.GetLeader().GetHost(),
Port: resp.GetLeader().GetPort(),
},
},
Spaces: list,
}
return s
}

func (w spaceWrapper) GetName() string {
return string(w.Space.GetName())
type metaBaserWrap struct {
code nerrors.ErrorCode
leader types.HostAddr
}

func (m metaBaserWrap) GetCode() nerrors.ErrorCode {
return m.code
}

func (m metaBaserWrap) GetLeader() string {
return fmt.Sprintf("%s:%d", m.leader.Host, m.leader.Port)
}
10 changes: 5 additions & 5 deletions ccore/nebula/internal/driver/v2_6/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@ func (c *defaultMetaClient) Close() error {
return nil
}

func (c *defaultMetaClient) AddHosts(endpoints []string) error {
return nerrors.ErrUnsupported
func (c *defaultMetaClient) AddHosts(endpoints []string) (types.MetaBaser, error) {
return nil, nerrors.ErrUnsupported
}

func (c *defaultMetaClient) DropHosts(endpoints []string) error {
return nerrors.ErrUnsupported
func (c *defaultMetaClient) DropHosts(endpoints []string) (types.MetaBaser, error) {
return nil, nerrors.ErrUnsupported
}

func (c *defaultMetaClient) ListSpaces() (types.Spaces, error) {
Expand All @@ -66,7 +66,7 @@ func (c *defaultMetaClient) ListSpaces() (types.Spaces, error) {
return nil, err
}

return newSpacesWrapper(resp.Spaces), nil
return newSpacesWrapper(resp), nil
}

func (c *defaultMetaClient) Balance(req types.BalanceReq) (types.Balancer, error) {
Expand Down
Loading