Skip to content

Commit

Permalink
feat(go-client): add ListNodes as admin API for meta (apache#1939)
Browse files Browse the repository at this point in the history
  • Loading branch information
empiredan authored Mar 11, 2024
1 parent 7a5593f commit 2bcdaf5
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 9 deletions.
30 changes: 30 additions & 0 deletions go-client/admin/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ type Client interface {
// Empty `args` means "list all available tables"; Otherwise, the only parameter would
// specify the status of the returned tables.
ListTables(args ...interface{}) ([]*replication.AppInfo, error)

// Empty `args` means "list all alive nodes"; Otherwise, the only parameter would
// specify the status of the returned nodes.
ListNodes(args ...interface{}) ([]*admin.NodeInfo, error)
}

type Config struct {
Expand Down Expand Up @@ -234,3 +238,29 @@ func (c *rpcBasedClient) ListTables(args ...interface{}) ([]*replication.AppInfo
}
return c.listTables(args[0].(replication.AppStatus))
}

func (c *rpcBasedClient) listNodes(status admin.NodeStatus) ([]*admin.NodeInfo, error) {
req := &admin.ConfigurationListNodesRequest{
Status: status,
}

var nodes []*admin.NodeInfo
var respErr error
err := c.callMeta("ListNodes", req, func(iresp interface{}) {
resp := iresp.(*admin.ConfigurationListNodesResponse)
nodes = resp.Infos
respErr = base.GetResponseError(resp)
})
if err != nil {
return nodes, err
}

return nodes, respErr
}

func (c *rpcBasedClient) ListNodes(args ...interface{}) ([]*admin.NodeInfo, error) {
if len(args) == 0 {
return c.listNodes(admin.NodeStatus_NS_ALIVE)
}
return c.listNodes(args[0].(admin.NodeStatus))
}
55 changes: 49 additions & 6 deletions go-client/admin/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"testing"
"time"

"github.com/apache/incubator-pegasus/go-client/idl/admin"
"github.com/apache/incubator-pegasus/go-client/idl/replication"
"github.com/apache/incubator-pegasus/go-client/pegasus"
"github.com/stretchr/testify/assert"
Expand All @@ -43,6 +44,22 @@ func defaultConfig() Config {
}
}

func defaultReplicaServerPorts() []int {
return []int{34801, 34802, 34803}
}

func timeoutConfig() Config {
return Config{
MetaServers: []string{"0.0.0.0:123456"},
Timeout: 500 * time.Millisecond,
}
}

func testAdmin_Timeout(t *testing.T, exec func(c Client) error) {
c := NewClient(timeoutConfig())
assert.Equal(t, context.DeadlineExceeded, exec(c))
}

func TestAdmin_Table(t *testing.T) {
c := NewClient(defaultConfig())

Expand Down Expand Up @@ -75,13 +92,10 @@ func TestAdmin_Table(t *testing.T) {
}

func TestAdmin_ListTablesTimeout(t *testing.T) {
c := NewClient(Config{
MetaServers: []string{"0.0.0.0:123456"},
Timeout: 500 * time.Millisecond,
testAdmin_Timeout(t, func(c Client) error {
_, err := c.ListTables()
return err
})

_, err := c.ListTables()
assert.Equal(t, err, context.DeadlineExceeded)
}

// Ensures after the call `CreateTable` ends, the table must be right available to access.
Expand Down Expand Up @@ -145,3 +159,32 @@ func TestAdmin_GetAppEnvs(t *testing.T) {
assert.Empty(t, tb.Envs)
}
}

func TestAdmin_ListNodes(t *testing.T) {
c := NewClient(defaultConfig())

nodes, err := c.ListNodes()
assert.Nil(t, err)

expectedReplicaServerPorts := defaultReplicaServerPorts()

// Compare slice length.
assert.Equal(t, len(expectedReplicaServerPorts), len(nodes))

actualReplicaServerPorts := make([]int, len(nodes))
for i, node := range nodes {
// Each node should be alive.
assert.Equal(t, admin.NodeStatus_NS_ALIVE, node.Status)
actualReplicaServerPorts[i] = node.Address.GetPort()
}

// Match elements without extra ordering.
assert.ElementsMatch(t, expectedReplicaServerPorts, actualReplicaServerPorts)
}

func TestAdmin_ListNodesTimeout(t *testing.T) {
testAdmin_Timeout(t, func(c Client) error {
_, err := c.ListNodes()
return err
})
}
6 changes: 3 additions & 3 deletions go-client/idl/base/rpc_address.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,16 @@ func (r *RPCAddress) String() string {
return fmt.Sprintf("RPCAddress(%s)", r.GetAddress())
}

func (r *RPCAddress) getIp() net.IP {
func (r *RPCAddress) GetIP() net.IP {
return net.IPv4(byte(0xff&(r.address>>56)), byte(0xff&(r.address>>48)), byte(0xff&(r.address>>40)), byte(0xff&(r.address>>32)))
}

func (r *RPCAddress) getPort() int {
func (r *RPCAddress) GetPort() int {
return int(0xffff & (r.address >> 16))
}

func (r *RPCAddress) GetAddress() string {
return fmt.Sprintf("%s:%d", r.getIp(), r.getPort())
return fmt.Sprintf("%s:%d", r.GetIP(), r.GetPort())
}

func (r *RPCAddress) GetRawAddress() int64 {
Expand Down

0 comments on commit 2bcdaf5

Please sign in to comment.