Skip to content

Commit

Permalink
feat(admin-cli): support online-migrate table to another cluster usin…
Browse files Browse the repository at this point in the history
…g table-duplication (#1006)
  • Loading branch information
foreverneverer authored Jun 28, 2022
1 parent 6d4d8b6 commit 347f7b9
Show file tree
Hide file tree
Showing 11 changed files with 611 additions and 81 deletions.
1 change: 1 addition & 0 deletions admin-cli/cmd/table_env.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
var predefinedAppEnvKeys = []string{
"rocksdb.usage_scenario",
"replica.deny_client_write",
"replica.deny_client_request",
"replica.write_throttling",
"replica.write_throttling_by_size",
"default_ttl",
Expand Down
48 changes: 48 additions & 0 deletions admin-cli/cmd/table_migrator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package cmd

import (
"github.com/apache/incubator-pegasus/admin-cli/executor/toolkits/tablemigrator"
"github.com/apache/incubator-pegasus/admin-cli/shell"
"github.com/desertbit/grumble"
)

func init() {
shell.AddCommand(&grumble.Command{
Name: "table-migrator",
Help: "migrate table from current cluster to another via table duplication and metaproxy",
Flags: func(f *grumble.Flags) {
f.String("t", "table", "", "table name")
f.String("n", "node", "", "zk node, addrs:port, default equal with peagsus "+
"cluster zk addrs, you can use `cluster-info` to show it")
f.String("r", "root", "", "zk root path. the tool will update table addrs in "+
"the path of meatproxy, if you don't specify it, that is means user need manual-switch the table addrs")
f.String("c", "cluster", "", "target cluster name")
f.String("m", "meta", "", "target meta list")
f.Float64("p", "threshold", 100000, "pending mutation threshold when server will reject all write request")
},
Run: func(c *grumble.Context) error {
return tablemigrator.MigrateTable(pegasusClient, c.Flags.String("table"),
c.Flags.String("node"), c.Flags.String("root"),
c.Flags.String("cluster"), c.Flags.String("meta"), c.Flags.Float64("threshold"))
},
})
}
36 changes: 36 additions & 0 deletions admin-cli/cmd/table_version.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package cmd

import (
"github.com/apache/incubator-pegasus/admin-cli/executor"
"github.com/apache/incubator-pegasus/admin-cli/shell"
"github.com/desertbit/grumble"
)

func init() {
shell.AddCommand(&grumble.Command{
Name: "data-version",
Help: "query data version",
Run: shell.RequireUseTable(func(c *shell.Context) error {
return executor.QueryTableVersion(pegasusClient, c.UseTable)
}),
})
}
109 changes: 28 additions & 81 deletions admin-cli/executor/server_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,20 @@
package executor

import (
"context"
"encoding/json"
"fmt"
"sort"
"strings"
"sync"
"time"

"github.com/apache/incubator-pegasus/admin-cli/util"
"github.com/apache/incubator-pegasus/go-client/session"
"github.com/go-resty/resty/v2"
)

type httpRequest func(addr string, cmd command) (string, error)

// map[*util.PegasusNode]*cmdResult is not sorted, pass nodes is for print sorted result
type printResponse func(nodeType session.NodeType, sortedNodeList []string, resp map[string]*cmdResult)
// map[*util.PegasusNode]*util.Result is not sorted, pass nodes is for print sorted result
type printResponse func(nodeType session.NodeType, sortedNodeList []string, resp map[string]*util.Result)

type action struct {
request httpRequest
request util.HTTPRequestFunc
print printResponse
}

Expand All @@ -55,23 +49,13 @@ var sectionsMap = map[session.NodeType]string{
// TODO(jiashuo1) support collector
}

type command struct {
name string
value string
}

type response struct {
Name string
Section string
Tags string
Value string
}

type cmdResult struct {
resp string
err error
}

//TODO(jiashuo1) not support update collector config
func ConfigCommand(client *Client, nodeType session.NodeType, nodeAddr string, name string, actionType string, value string) error {
var nodes []*util.PegasusNode
Expand All @@ -87,11 +71,11 @@ func ConfigCommand(client *Client, nodeType session.NodeType, nodeAddr string, n
}

if ac, ok := actionsMap[actionType]; ok {
cmd := command{
name: name,
value: value,
cmd := util.Arguments{
Name: name,
Value: value,
}
results := batchCallHTTP(nodes, ac.request, cmd)
results := util.BatchCallHTTP(nodes, ac.request, cmd)

var sortedNodeList []string
for _, n := range nodes {
Expand All @@ -106,59 +90,22 @@ func ConfigCommand(client *Client, nodeType session.NodeType, nodeAddr string, n
return nil
}

func batchCallHTTP(nodes []*util.PegasusNode, request httpRequest, cmd command) map[string]*cmdResult {
results := make(map[string]*cmdResult)

var mu sync.Mutex
var wg sync.WaitGroup
wg.Add(len(nodes))
for _, n := range nodes {
go func(node *util.PegasusNode) {
_, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
result, err := request(node.TCPAddr(), cmd)
mu.Lock()
if err != nil {
results[node.CombinedAddr()] = &cmdResult{err: err}
} else {
results[node.CombinedAddr()] = &cmdResult{resp: result}
}
mu.Unlock()
wg.Done()
}(n)
}
wg.Wait()

return results
}

func callHTTP(url string) (string, error) {
resp, err := resty.New().SetTimeout(time.Second * 10).R().Get(url)
if err != nil {
return "", fmt.Errorf("failed to call \"%s\": %s", url, err)
}
if resp.StatusCode() != 200 {
return "", fmt.Errorf("failed to call \"%s\": code=%d", url, resp.StatusCode())
}
return string(resp.Body()), nil
}

func listConfig(addr string, cmd command) (string, error) {
func listConfig(addr string, cmd util.Arguments) (string, error) {
url := fmt.Sprintf("http://%s/configs", addr)
return callHTTP(url)
return util.CallHTTPGet(url)
}

func printConfigList(nodeType session.NodeType, sortedNodeList []string, results map[string]*cmdResult) {
func printConfigList(nodeType session.NodeType, sortedNodeList []string, results map[string]*util.Result) {
fmt.Printf("CMD: list \n")
for _, node := range sortedNodeList {
cmdRes := results[node]
if cmdRes.err != nil {
fmt.Printf("[%s] %s\n", node, cmdRes.err)
if cmdRes.Err != nil {
fmt.Printf("[%s] %s\n", node, cmdRes.Err)
continue
}

var respMap map[string]response
err := json.Unmarshal([]byte(cmdRes.resp), &respMap)
err := json.Unmarshal([]byte(cmdRes.Resp), &respMap)
if err != nil {
fmt.Printf("[%s] %s\n", node, err)
continue
Expand All @@ -178,24 +125,24 @@ func printConfigList(nodeType session.NodeType, sortedNodeList []string, results
}
}

func getConfig(addr string, cmd command) (string, error) {
url := fmt.Sprintf("http://%s/config?name=%s", addr, cmd.name)
return callHTTP(url)
func getConfig(addr string, cmd util.Arguments) (string, error) {
url := fmt.Sprintf("http://%s/config?name=%s", addr, cmd.Name)
return util.CallHTTPGet(url)
}

func printConfigValue(nodeType session.NodeType, sortedNodeList []string, results map[string]*cmdResult) {
func printConfigValue(nodeType session.NodeType, sortedNodeList []string, results map[string]*util.Result) {
fmt.Printf("CMD: get \n")
for _, node := range sortedNodeList {
cmdRes := results[node]
if cmdRes.err != nil {
fmt.Printf("[%s] %s\n", node, cmdRes.err)
if cmdRes.Err != nil {
fmt.Printf("[%s] %s\n", node, cmdRes.Err)
continue
}

var resp response
err := json.Unmarshal([]byte(cmdRes.resp), &resp)
err := json.Unmarshal([]byte(cmdRes.Resp), &resp)
if err != nil {
fmt.Printf("[%s] %s\n", node, cmdRes.resp)
fmt.Printf("[%s] %s\n", node, cmdRes.Resp)
continue
}

Expand All @@ -208,22 +155,22 @@ func printConfigValue(nodeType session.NodeType, sortedNodeList []string, result
}
}

func updateConfig(addr string, cmd command) (string, error) {
url := fmt.Sprintf("http://%s/updateConfig?%s=%s", addr, cmd.name, cmd.value)
return callHTTP(url)
func updateConfig(addr string, cmd util.Arguments) (string, error) {
url := fmt.Sprintf("http://%s/updateConfig?%s=%s", addr, cmd.Name, cmd.Value)
return util.CallHTTPGet(url)
}

func printConfigUpdate(nodeType session.NodeType, sortedNodeList []string, results map[string]*cmdResult) {
func printConfigUpdate(nodeType session.NodeType, sortedNodeList []string, results map[string]*util.Result) {
fmt.Printf("CMD: set \n")
for _, node := range sortedNodeList {
cmdRes := results[node]
if cmdRes.err != nil {
fmt.Printf("[%s] %s\n", node, cmdRes.err)
if cmdRes.Err != nil {
fmt.Printf("[%s] %s\n", node, cmdRes.Err)
continue
}

var resMap map[string]string
err := json.Unmarshal([]byte(cmdRes.resp), &resMap)
err := json.Unmarshal([]byte(cmdRes.Resp), &resMap)
if err != nil {
fmt.Printf("[%s] %s\n", node, err)
continue
Expand Down
89 changes: 89 additions & 0 deletions admin-cli/executor/table_version.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package executor

import (
"encoding/json"
"fmt"
"strconv"

"github.com/apache/incubator-pegasus/admin-cli/util"
"github.com/apache/incubator-pegasus/go-client/session"
)

type TableDataVersion struct {
DataVersion string `json:"data_version"`
}

func QueryTableVersion(client *Client, table string) error {
version, err := QueryReplicaDataVersion(client, table)
if err != nil {
return nil
}

// formats into JSON
outputBytes, _ := json.MarshalIndent(version, "", " ")
fmt.Fprintln(client, string(outputBytes))
return nil
}

func QueryReplicaDataVersion(client *Client, table string) (*TableDataVersion, error) {
nodes := client.Nodes.GetAllNodes(session.NodeTypeReplica)
resp, err := client.Meta.QueryConfig(table)
if err != nil {
return nil, err
}

args := util.Arguments{
Name: "app_id",
Value: strconv.Itoa(int(resp.AppID)),
}
results := util.BatchCallHTTP(nodes, getTableDataVersion, args)

var finalVersion TableDataVersion
versions := make(map[string]TableDataVersion)
for _, result := range results {
if result.Err != nil {
return nil, result.Err
}
err := json.Unmarshal([]byte(result.Resp), &versions)
if err != nil {
return nil, err
}

for _, version := range versions {
if finalVersion.DataVersion == "" {
finalVersion = version
} else {
if version.DataVersion == finalVersion.DataVersion {
continue
} else {
return nil, fmt.Errorf("replica versions are not consistent")
}
}
}
}
return &finalVersion, nil
}

func getTableDataVersion(addr string, args util.Arguments) (string, error) {
url := fmt.Sprintf("http://%s/replica/data_version?%s=%s", addr, args.Name, args.Value)
return util.CallHTTPGet(url)
}
Loading

0 comments on commit 347f7b9

Please sign in to comment.