Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(go-client): update config once replica server failed and forward to primary meta server if it was changed #1916

Merged
merged 8 commits into from
Jun 20, 2024
1 change: 1 addition & 0 deletions go-client/pegasus/table_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,7 @@ func (p *pegasusTableConnector) handleReplicaError(err error, replica *session.R

case base.ERR_TIMEOUT:
case context.DeadlineExceeded:
confUpdate = true
case context.Canceled:
// timeout will not trigger a configuration update

Expand Down
8 changes: 7 additions & 1 deletion go-client/pegasus/table_connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,8 +269,14 @@ func TestPegasusTableConnector_TriggerSelfUpdate(t *testing.T) {
assert.True(t, confUpdate)
assert.False(t, retry)

confUpdate, retry, err = ptb.handleReplicaError(context.DeadlineExceeded, nil)
<-ptb.confUpdateCh
assert.Error(t, err)
assert.True(t, confUpdate)
assert.False(t, retry)

{ // Ensure: The following errors should not trigger configuration update
errorTypes := []error{base.ERR_TIMEOUT, context.DeadlineExceeded, base.ERR_CAPACITY_EXCEEDED, base.ERR_NOT_ENOUGH_MEMBER, base.ERR_BUSY, base.ERR_SPLITTING, base.ERR_DISK_INSUFFICIENT}
errorTypes := []error{base.ERR_TIMEOUT, base.ERR_CAPACITY_EXCEEDED, base.ERR_NOT_ENOUGH_MEMBER, base.ERR_BUSY, base.ERR_SPLITTING, base.ERR_DISK_INSUFFICIENT}

for _, err := range errorTypes {
channelEmpty := false
Expand Down
67 changes: 55 additions & 12 deletions go-client/session/meta_call.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"time"

"github.com/apache/incubator-pegasus/go-client/idl/base"
"github.com/apache/incubator-pegasus/go-client/idl/replication"
"github.com/apache/incubator-pegasus/go-client/pegalog"
)

type metaCallFunc func(context.Context, *metaSession) (metaResponse, error)
Expand All @@ -42,21 +44,23 @@ type metaCall struct {
backupCh chan interface{}
callFunc metaCallFunc

metas []*metaSession
lead int
metaIPAddrs []string
metas []*metaSession
lead int
// After a Run successfully ends, the current leader will be set in this field.
// If there is no meta failover, `newLead` equals to `lead`.
newLead uint32
}

func newMetaCall(lead int, metas []*metaSession, callFunc metaCallFunc) *metaCall {
func newMetaCall(lead int, metas []*metaSession, callFunc metaCallFunc, meatIPAddr []string) *metaCall {
return &metaCall{
metas: metas,
lead: lead,
newLead: uint32(lead),
respCh: make(chan metaResponse),
callFunc: callFunc,
backupCh: make(chan interface{}),
metas: metas,
metaIPAddrs: meatIPAddr,
lead: lead,
newLead: uint32(lead),
respCh: make(chan metaResponse),
callFunc: callFunc,
backupCh: make(chan interface{}),
}
}

Expand Down Expand Up @@ -106,14 +110,40 @@ func (c *metaCall) Run(ctx context.Context) (metaResponse, error) {
}

// issueSingleMeta returns false if we should try another meta
func (c *metaCall) issueSingleMeta(ctx context.Context, i int) bool {
meta := c.metas[i]
func (c *metaCall) issueSingleMeta(ctx context.Context, curLeader int) bool {
meta := c.metas[curLeader]
resp, err := c.callFunc(ctx, meta)

if err == nil && resp.GetErr().Errno == base.ERR_FORWARD_TO_OTHERS.String() {
forwardAddr := c.getMetaServiceForwardAddress(resp)
if forwardAddr == nil {
return false
}
addr := forwardAddr.GetAddress()
found := false
for i := range c.metaIPAddrs {
if addr == c.metaIPAddrs[i] {
found = true
break
}
}
if !found {
c.metaIPAddrs = append(c.metaIPAddrs, addr)
c.metas = append(c.metas, &metaSession{
NodeSession: newNodeSession(addr, NodeTypeMeta),
logger: pegalog.GetLogger(),
})
curLeader = len(c.metas) - 1
c.metas[curLeader].logger.Printf("add forward address %s as meta server", addr)
resp, err = c.callFunc(ctx, c.metas[curLeader])
}
}

if err != nil || resp.GetErr().Errno == base.ERR_FORWARD_TO_OTHERS.String() {
return false
}
// the RPC succeeds, this meta becomes the new leader now.
atomic.StoreUint32(&c.newLead, uint32(i))
atomic.StoreUint32(&c.newLead, uint32(curLeader))
select {
case <-ctx.Done():
case c.respCh <- resp:
Expand All @@ -133,3 +163,16 @@ func (c *metaCall) issueBackupMetas(ctx context.Context) {
}(i)
}
}

func (c *metaCall) getMetaServiceForwardAddress(resp metaResponse) *base.RPCAddress {
rep, ok := resp.(*replication.QueryCfgResponse)
if !ok || rep.GetErr().Errno != base.ERR_FORWARD_TO_OTHERS.String() {
return nil
} else if rep.GetPartitions() == nil || len(rep.GetPartitions()) == 0 {
return nil
} else {
return rep.Partitions[0].Primary

}

acelyc111 marked this conversation as resolved.
Show resolved Hide resolved
}
18 changes: 17 additions & 1 deletion go-client/session/meta_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,12 @@ func NewMetaManager(addrs []string, creator NodeSessionCreator) *MetaManager {

func (m *MetaManager) call(ctx context.Context, callFunc metaCallFunc) (metaResponse, error) {
lead := m.getCurrentLeader()
call := newMetaCall(lead, m.metas, callFunc)
call := newMetaCall(lead, m.metas, callFunc, m.metaIPAddrs)
resp, err := call.Run(ctx)
if err == nil {
m.setCurrentLeader(int(call.newLead))
m.setNewMetas(call.metas)
m.setMetaIPAddrs(call.metaIPAddrs)
}
return resp, err
}
Expand Down Expand Up @@ -131,6 +133,20 @@ func (m *MetaManager) setCurrentLeader(lead int) {
m.currentLeader = lead
}

func (m *MetaManager) setNewMetas(metas []*metaSession) {
m.mu.Lock()
defer m.mu.Unlock()

m.metas = metas
}

func (m *MetaManager) setMetaIPAddrs(metaIPAddrs []string) {
m.mu.Lock()
defer m.mu.Unlock()

m.metaIPAddrs = metaIPAddrs
}

// Close the sessions.
func (m *MetaManager) Close() error {
funcs := make([]func() error, len(m.metas))
Expand Down
19 changes: 18 additions & 1 deletion go-client/session/meta_session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,28 @@ func TestMetaManager_FirstMetaDead(t *testing.T) {
for i := 0; i < 3; i++ {
call := newMetaCall(mm.currentLeader, mm.metas, func(rpcCtx context.Context, ms *metaSession) (metaResponse, error) {
return ms.queryConfig(rpcCtx, "temp")
})
}, []string{"0.0.0.0:12345", "0.0.0.0:34603", "0.0.0.0:34602", "0.0.0.0:34601"})
// This a trick for testing. If metaCall issue to other meta, not only to the leader, this nil channel will cause panic.
call.backupCh = nil
metaResp, err := call.Run(context.Background())
assert.Nil(t, err)
assert.Equal(t, metaResp.GetErr().Errno, base.ERR_OK.String())
}
}

// This case mocks the case that the server primary meta is not in the client metalist.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The meta servers in the test are 0.0.0.0:3460{1..3}, which one is "not in the client metalist" ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When onebox starts, the primary meta server is randomized. Therefore, a loop is used, and only one meta server is passed to the go client each time. This ensures that redirection is required twice in the loop.

// And the client will forward to the primary meta automatically.
func TestNodeSession_ForwardToPrimaryMeta(t *testing.T) {
defer leaktest.Check(t)()

metaList := []string{"0.0.0.0:34601", "0.0.0.0:34602", "0.0.0.0:34603"}

for i := 0; i < 3; i++ {
mm := NewMetaManager(metaList[i:i+1], NewNodeSession)
defer mm.Close()
resp, err := mm.QueryConfig(context.Background(), "temp")
println(resp)
acelyc111 marked this conversation as resolved.
Show resolved Hide resolved
assert.Nil(t, err)
assert.Equal(t, resp.Err.Errno, base.ERR_OK.String())
}
}
Loading