Skip to content

Commit

Permalink
fix(go-client): update config once replica server failed and forward …
Browse files Browse the repository at this point in the history
…to primary meta server if it was changed (apache#1916)

apache#1880
apache#1856

As for apache#1856:
when go client is writing to one partition and the replica node core dump, go client will finish 
after timeout without updating the configuration. In this case, the go client only restart to solve
the problem. 

In this pr, the client would update configuration of table automatically when someone replica
core dump. After testing, we found that the the replica error is "context.DeadlineExceeded"
(incubator-pegasus/go-client/pegasus/table_connector.go) when the replica core dump.

Therefore, when client meets the error, the go client will update configuration automatically.
Besides, this request will not retry. Because only in the case of timeout, the configuration will be
automatically updated. If you try again before then, it will still fail. There is also the risk of infinite
retries. Therefore, it is better to directly return the request error to the user and let the user try
again.

As for apache#1880:
When the client sends an RPC message "RPC_CM_QUERY_PARTITION_CONFIG_BY_INDEX" to the
meta server, if the meta server isn't primary, the response that forward to the primary meta server
will return. 

According to the above description, assuming that the client does not have a primary meta server
configured, we can connect to the primary meta server in this way.

About tests:
1. Start onebox, and the primary meta server is not added to the go client configuration.
2. The go client writes data to a certain partition and then kills the replica process.
  • Loading branch information
lengyuexuexuan authored and lupengfan1 committed Jul 17, 2024
1 parent 1ea072e commit eb4425c
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 15 deletions.
1 change: 1 addition & 0 deletions go-client/pegasus/table_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,7 @@ func (p *pegasusTableConnector) handleReplicaError(err error, replica *session.R

case base.ERR_TIMEOUT:
case context.DeadlineExceeded:
confUpdate = true
case context.Canceled:
// timeout will not trigger a configuration update

Expand Down
8 changes: 7 additions & 1 deletion go-client/pegasus/table_connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,8 +269,14 @@ func TestPegasusTableConnector_TriggerSelfUpdate(t *testing.T) {
assert.True(t, confUpdate)
assert.False(t, retry)

confUpdate, retry, err = ptb.handleReplicaError(context.DeadlineExceeded, nil)
<-ptb.confUpdateCh
assert.Error(t, err)
assert.True(t, confUpdate)
assert.False(t, retry)

{ // Ensure: The following errors should not trigger configuration update
errorTypes := []error{base.ERR_TIMEOUT, context.DeadlineExceeded, base.ERR_CAPACITY_EXCEEDED, base.ERR_NOT_ENOUGH_MEMBER, base.ERR_BUSY, base.ERR_SPLITTING, base.ERR_DISK_INSUFFICIENT}
errorTypes := []error{base.ERR_TIMEOUT, base.ERR_CAPACITY_EXCEEDED, base.ERR_NOT_ENOUGH_MEMBER, base.ERR_BUSY, base.ERR_SPLITTING, base.ERR_DISK_INSUFFICIENT}

for _, err := range errorTypes {
channelEmpty := false
Expand Down
70 changes: 58 additions & 12 deletions go-client/session/meta_call.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"time"

"github.com/apache/incubator-pegasus/go-client/idl/base"
"github.com/apache/incubator-pegasus/go-client/idl/replication"
"github.com/apache/incubator-pegasus/go-client/pegalog"
)

type metaCallFunc func(context.Context, *metaSession) (metaResponse, error)
Expand All @@ -42,21 +44,24 @@ type metaCall struct {
backupCh chan interface{}
callFunc metaCallFunc

metas []*metaSession
lead int
metaIPAddrs []string
metas []*metaSession
lead int
// After a Run successfully ends, the current leader will be set in this field.
// If there is no meta failover, `newLead` equals to `lead`.
newLead uint32
lock sync.RWMutex
}

func newMetaCall(lead int, metas []*metaSession, callFunc metaCallFunc) *metaCall {
func newMetaCall(lead int, metas []*metaSession, callFunc metaCallFunc, meatIPAddr []string) *metaCall {
return &metaCall{
metas: metas,
lead: lead,
newLead: uint32(lead),
respCh: make(chan metaResponse),
callFunc: callFunc,
backupCh: make(chan interface{}),
metas: metas,
metaIPAddrs: meatIPAddr,
lead: lead,
newLead: uint32(lead),
respCh: make(chan metaResponse),
callFunc: callFunc,
backupCh: make(chan interface{}),
}
}

Expand Down Expand Up @@ -106,14 +111,44 @@ func (c *metaCall) Run(ctx context.Context) (metaResponse, error) {
}

// issueSingleMeta returns false if we should try another meta
func (c *metaCall) issueSingleMeta(ctx context.Context, i int) bool {
meta := c.metas[i]
func (c *metaCall) issueSingleMeta(ctx context.Context, curLeader int) bool {
meta := c.metas[curLeader]
resp, err := c.callFunc(ctx, meta)

if err == nil && resp.GetErr().Errno == base.ERR_FORWARD_TO_OTHERS.String() {
forwardAddr := c.getMetaServiceForwardAddress(resp)
if forwardAddr == nil {
return false
}
addr := forwardAddr.GetAddress()
found := false
c.lock.Lock()
for i := range c.metaIPAddrs {
if addr == c.metaIPAddrs[i] {
found = true
break
}
}
c.lock.Unlock()
if !found {
c.lock.Lock()
c.metaIPAddrs = append(c.metaIPAddrs, addr)
c.metas = append(c.metas, &metaSession{
NodeSession: newNodeSession(addr, NodeTypeMeta),
logger: pegalog.GetLogger(),
})
c.lock.Unlock()
curLeader = len(c.metas) - 1
c.metas[curLeader].logger.Printf("add forward address %s as meta server", addr)
resp, err = c.callFunc(ctx, c.metas[curLeader])
}
}

if err != nil || resp.GetErr().Errno == base.ERR_FORWARD_TO_OTHERS.String() {
return false
}
// the RPC succeeds, this meta becomes the new leader now.
atomic.StoreUint32(&c.newLead, uint32(i))
atomic.StoreUint32(&c.newLead, uint32(curLeader))
select {
case <-ctx.Done():
case c.respCh <- resp:
Expand All @@ -133,3 +168,14 @@ func (c *metaCall) issueBackupMetas(ctx context.Context) {
}(i)
}
}

func (c *metaCall) getMetaServiceForwardAddress(resp metaResponse) *base.RPCAddress {
rep, ok := resp.(*replication.QueryCfgResponse)
if !ok || rep.GetErr().Errno != base.ERR_FORWARD_TO_OTHERS.String() {
return nil
} else if rep.GetPartitions() == nil || len(rep.GetPartitions()) == 0 {
return nil
} else {
return rep.Partitions[0].Primary
}
}
18 changes: 17 additions & 1 deletion go-client/session/meta_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,12 @@ func NewMetaManager(addrs []string, creator NodeSessionCreator) *MetaManager {

func (m *MetaManager) call(ctx context.Context, callFunc metaCallFunc) (metaResponse, error) {
lead := m.getCurrentLeader()
call := newMetaCall(lead, m.metas, callFunc)
call := newMetaCall(lead, m.metas, callFunc, m.metaIPAddrs)
resp, err := call.Run(ctx)
if err == nil {
m.setCurrentLeader(int(call.newLead))
m.setNewMetas(call.metas)
m.setMetaIPAddrs(call.metaIPAddrs)
}
return resp, err
}
Expand Down Expand Up @@ -131,6 +133,20 @@ func (m *MetaManager) setCurrentLeader(lead int) {
m.currentLeader = lead
}

func (m *MetaManager) setNewMetas(metas []*metaSession) {
m.mu.Lock()
defer m.mu.Unlock()

m.metas = metas
}

func (m *MetaManager) setMetaIPAddrs(metaIPAddrs []string) {
m.mu.Lock()
defer m.mu.Unlock()

m.metaIPAddrs = metaIPAddrs
}

// Close the sessions.
func (m *MetaManager) Close() error {
funcs := make([]func() error, len(m.metas))
Expand Down
18 changes: 17 additions & 1 deletion go-client/session/meta_session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,27 @@ func TestMetaManager_FirstMetaDead(t *testing.T) {
for i := 0; i < 3; i++ {
call := newMetaCall(mm.currentLeader, mm.metas, func(rpcCtx context.Context, ms *metaSession) (metaResponse, error) {
return ms.queryConfig(rpcCtx, "temp")
})
}, []string{"0.0.0.0:12345", "0.0.0.0:34603", "0.0.0.0:34602", "0.0.0.0:34601"})
// This a trick for testing. If metaCall issue to other meta, not only to the leader, this nil channel will cause panic.
call.backupCh = nil
metaResp, err := call.Run(context.Background())
assert.Nil(t, err)
assert.Equal(t, metaResp.GetErr().Errno, base.ERR_OK.String())
}
}

// This case mocks the case that the server primary meta is not in the client metalist.
// And the client will forward to the primary meta automatically.
func TestNodeSession_ForwardToPrimaryMeta(t *testing.T) {
defer leaktest.Check(t)()

metaList := []string{"0.0.0.0:34601", "0.0.0.0:34602", "0.0.0.0:34603"}

for i := 0; i < 3; i++ {
mm := NewMetaManager(metaList[i:i+1], NewNodeSession)
defer mm.Close()
resp, err := mm.QueryConfig(context.Background(), "temp")
assert.Nil(t, err)
assert.Equal(t, resp.Err.Errno, base.ERR_OK.String())
}
}

0 comments on commit eb4425c

Please sign in to comment.