diff --git a/go-client/admin/client.go b/go-client/admin/client.go index 77851f477e..d51a0b52aa 100644 --- a/go-client/admin/client.go +++ b/go-client/admin/client.go @@ -54,6 +54,10 @@ type Client interface { // Empty `args` means "list all available tables"; Otherwise, the only parameter would // specify the status of the returned tables. ListTables(args ...interface{}) ([]*replication.AppInfo, error) + + // Empty `args` means "list all alive nodes"; Otherwise, the only parameter would + // specify the status of the returned nodes. + ListNodes(args ...interface{}) ([]*admin.NodeInfo, error) } type Config struct { @@ -234,3 +238,29 @@ func (c *rpcBasedClient) ListTables(args ...interface{}) ([]*replication.AppInfo } return c.listTables(args[0].(replication.AppStatus)) } + +func (c *rpcBasedClient) listNodes(status admin.NodeStatus) ([]*admin.NodeInfo, error) { + req := &admin.ConfigurationListNodesRequest{ + Status: status, + } + + var nodes []*admin.NodeInfo + var respErr error + err := c.callMeta("ListNodes", req, func(iresp interface{}) { + resp := iresp.(*admin.ConfigurationListNodesResponse) + nodes = resp.Infos + respErr = base.GetResponseError(resp) + }) + if err != nil { + return nodes, err + } + + return nodes, respErr +} + +func (c *rpcBasedClient) ListNodes(args ...interface{}) ([]*admin.NodeInfo, error) { + if len(args) == 0 { + return c.listNodes(admin.NodeStatus_NS_ALIVE) + } + return c.listNodes(args[0].(admin.NodeStatus)) +} diff --git a/go-client/admin/client_test.go b/go-client/admin/client_test.go index 8fdd13a4fa..6d8b12adec 100644 --- a/go-client/admin/client_test.go +++ b/go-client/admin/client_test.go @@ -25,6 +25,7 @@ import ( "testing" "time" + "github.com/apache/incubator-pegasus/go-client/idl/admin" "github.com/apache/incubator-pegasus/go-client/idl/replication" "github.com/apache/incubator-pegasus/go-client/pegasus" "github.com/stretchr/testify/assert" @@ -43,6 +44,22 @@ func defaultConfig() Config { } } +func defaultReplicaServerPorts() []int { + return []int{34801, 34802, 34803} +} + +func timeoutConfig() Config { + return Config{ + MetaServers: []string{"0.0.0.0:123456"}, + Timeout: 500 * time.Millisecond, + } +} + +func testAdmin_Timeout(t *testing.T, exec func(c Client) error) { + c := NewClient(timeoutConfig()) + assert.Equal(t, context.DeadlineExceeded, exec(c)) +} + func TestAdmin_Table(t *testing.T) { c := NewClient(defaultConfig()) @@ -75,13 +92,10 @@ func TestAdmin_Table(t *testing.T) { } func TestAdmin_ListTablesTimeout(t *testing.T) { - c := NewClient(Config{ - MetaServers: []string{"0.0.0.0:123456"}, - Timeout: 500 * time.Millisecond, + testAdmin_Timeout(t, func(c Client) error { + _, err := c.ListTables() + return err }) - - _, err := c.ListTables() - assert.Equal(t, err, context.DeadlineExceeded) } // Ensures after the call `CreateTable` ends, the table must be right available to access. @@ -145,3 +159,32 @@ func TestAdmin_GetAppEnvs(t *testing.T) { assert.Empty(t, tb.Envs) } } + +func TestAdmin_ListNodes(t *testing.T) { + c := NewClient(defaultConfig()) + + nodes, err := c.ListNodes() + assert.Nil(t, err) + + expectedReplicaServerPorts := defaultReplicaServerPorts() + + // Compare slice length. + assert.Equal(t, len(expectedReplicaServerPorts), len(nodes)) + + actualReplicaServerPorts := make([]int, len(nodes)) + for i, node := range nodes { + // Each node should be alive. + assert.Equal(t, admin.NodeStatus_NS_ALIVE, node.Status) + actualReplicaServerPorts[i] = node.Address.GetPort() + } + + // Match elements without extra ordering. + assert.ElementsMatch(t, expectedReplicaServerPorts, actualReplicaServerPorts) +} + +func TestAdmin_ListNodesTimeout(t *testing.T) { + testAdmin_Timeout(t, func(c Client) error { + _, err := c.ListNodes() + return err + }) +} diff --git a/go-client/idl/base/rpc_address.go b/go-client/idl/base/rpc_address.go index 9121cd5cde..d451c42284 100644 --- a/go-client/idl/base/rpc_address.go +++ b/go-client/idl/base/rpc_address.go @@ -57,16 +57,16 @@ func (r *RPCAddress) String() string { return fmt.Sprintf("RPCAddress(%s)", r.GetAddress()) } -func (r *RPCAddress) getIp() net.IP { +func (r *RPCAddress) GetIP() net.IP { return net.IPv4(byte(0xff&(r.address>>56)), byte(0xff&(r.address>>48)), byte(0xff&(r.address>>40)), byte(0xff&(r.address>>32))) } -func (r *RPCAddress) getPort() int { +func (r *RPCAddress) GetPort() int { return int(0xffff & (r.address >> 16)) } func (r *RPCAddress) GetAddress() string { - return fmt.Sprintf("%s:%d", r.getIp(), r.getPort()) + return fmt.Sprintf("%s:%d", r.GetIP(), r.GetPort()) } func (r *RPCAddress) GetRawAddress() int64 {