Skip to content

Commit

Permalink
Merge pull request #1776 from michael-berlin/vtctl_throttler_vtctl_cmds
Browse files Browse the repository at this point in the history
vtctl: Added throttler commands. vtworker: Enabled throttler RPC server.
  • Loading branch information
michael-berlin authored Jun 10, 2016
2 parents fd03d8b + e45f1da commit 5f98e05
Show file tree
Hide file tree
Showing 14 changed files with 427 additions and 8 deletions.
62 changes: 61 additions & 1 deletion doc/vtctlReference.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ Commands are listed in the following groups:
* [Keyspaces](#keyspaces)
* [Queries](#queries)
* [Replication Graph](#replication-graph)
* [Resharding Throttler](#resharding-throttler)
* [Schema, Version, Permissions](#schema-version-permissions)
* [Serving Graph](#serving-graph)
* [Shards](#shards)
Expand Down Expand Up @@ -800,6 +801,65 @@ Outputs a JSON structure that contains information about the ShardReplication.
* The <code>&lt;cell&gt;</code> and <code>&lt;keyspace/shard&gt;</code> arguments are required for the <code>&lt;GetShardReplication&gt;</code> command. This error occurs if the command is not called with exactly 2 arguments.


## Resharding Throttler

* [ThrottlerMaxRates](#throttlermaxrates)
* [ThrottlerSetMaxRate](#throttlersetmaxrate)

### ThrottlerMaxRates

Returns the current max rate of all active resharding throttlers on the server.

#### Example

<pre class="command-example">ThrottlerMaxRates -server &lt;vtworker or vttablet&gt;</pre>

#### Flags

| Name | Type | Definition |
| :-------- | :--------- | :--------- |
| server | string | vtworker or vttablet to connect to |


#### Arguments

* <code>&lt;vtworker or vttablet&gt;</code> &ndash; Required.

#### Errors

* the ThrottlerSetMaxRate command does not accept any positional parameters This error occurs if the command is not called with exactly 0 arguments.
* error creating a throttler client for <code>&lt;server&gt;</code> '%v': %v
* failed to get the throttler rate from <code>&lt;server&gt;</code> '%v': %v


### ThrottlerSetMaxRate

Sets the max rate for all active resharding throttlers on the server.

#### Example

<pre class="command-example">ThrottlerSetMaxRate -server &lt;vtworker or vttablet&gt; &lt;rate&gt;</pre>

#### Flags

| Name | Type | Definition |
| :-------- | :--------- | :--------- |
| server | string | vtworker or vttablet to connect to |


#### Arguments

* <code>&lt;vtworker or vttablet&gt;</code> &ndash; Required.
* <code>&lt;rate&gt;</code> &ndash; Required.

#### Errors

* the <code>&lt;rate&gt;</code> argument is required for the <code>&lt;ThrottlerSetMaxRate&gt;</code> command This error occurs if the command is not called with exactly one argument.
* failed to parse rate '%v' as integer value: %v
* error creating a throttler client for <code>&lt;server&gt;</code> '%v': %v
* failed to set the throttler rate on <code>&lt;server&gt;</code> '%v': %v


## Schema, Version, Permissions

* [ApplySchema](#applyschema)
Expand Down Expand Up @@ -1512,7 +1572,7 @@ Walks through a ShardReplication object and fixes the first error that it encoun

### ShardReplicationPositions

Shows the replication status of each slave machine in the shard graph. In this case, the status refers to the replication lag between the master vttablet and the slave vttablet. In Vitess, data is always written to the master vttablet first and then replicated to all slave vttablets.
Shows the replication status of each slave machine in the shard graph. In this case, the status refers to the replication lag between the master vttablet and the slave vttablet. In Vitess, data is always written to the master vttablet first and then replicated to all slave vttablets. Output is sorted by tablet type, then replication position. Use ctrl-C to interrupt command and see partial result if needed.

#### Example

Expand Down
11 changes: 11 additions & 0 deletions go/cmd/vtctl/plugin_grpcthrottlerclient.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// Copyright 2016, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package main

// Imports and register the gRPC throttler client.

import (
_ "github.com/youtube/vitess/go/vt/throttler/grpcthrottlerclient"
)
11 changes: 11 additions & 0 deletions go/cmd/vtctld/plugin_grpcthrottlerclient.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// Copyright 2016, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package main

// Imports and register the gRPC throttler client.

import (
_ "github.com/youtube/vitess/go/vt/throttler/grpcthrottlerclient"
)
11 changes: 11 additions & 0 deletions go/cmd/vtworker/plugin_grpcthrottlerserver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// Copyright 2016, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package main

// Imports and register the gRPC throttler server.

import (
_ "github.com/youtube/vitess/go/vt/throttler/grpcthrottlerserver"
)
132 changes: 132 additions & 0 deletions go/vt/vtctl/throttler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
// Copyright 2016, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package vtctl

import (
"flag"
"fmt"
"strconv"
"strings"
"time"

"github.com/olekukonko/tablewriter"
"github.com/youtube/vitess/go/vt/throttler"
"github.com/youtube/vitess/go/vt/throttler/throttlerclient"
"github.com/youtube/vitess/go/vt/wrangler"
"golang.org/x/net/context"
)

// This file contains the commands to control the throttler which is used during
// resharding (vtworker) and by filtered replication (vttablet).

const throttlerGroupName = "Resharding Throttler"
const shortTimeout = 15 * time.Second

func init() {
addCommandGroup(throttlerGroupName)

addCommand(throttlerGroupName, command{
"ThrottlerMaxRates",
commandThrottlerMaxRates,
"-server <vtworker or vttablet>",
"Returns the current max rate of all active resharding throttlers on the server."})
addCommand(throttlerGroupName, command{
"ThrottlerSetMaxRate",
commandThrottlerSetMaxRate,
"-server <vtworker or vttablet> <rate>",
"Sets the max rate for all active resharding throttlers on the server."})
}

func commandThrottlerMaxRates(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
server := subFlags.String("server", "", "vtworker or vttablet to connect to")
if err := subFlags.Parse(args); err != nil {
return err
}
if subFlags.NArg() != 0 {
return fmt.Errorf("the ThrottlerSetMaxRate command does not accept any positional parameters")
}

// Connect to the server.
ctx, cancel := context.WithTimeout(ctx, shortTimeout)
defer cancel()
client, err := throttlerclient.New(*server)
if err != nil {
return fmt.Errorf("error creating a throttler client for server '%v': %v", *server, err)
}
defer client.Close()

rates, err := client.MaxRates(ctx)
if err != nil {
return fmt.Errorf("failed to get the throttler rate from server '%v': %v", *server, err)
}

if len(rates) == 0 {
wr.Logger().Printf("There are no active throttlers on server '%v'.\n", *server)
return nil
}

table := tablewriter.NewWriter(loggerWriter{wr.Logger()})
table.SetAutoFormatHeaders(false)
table.SetHeader([]string{"Name", "Rate"})
for name, rate := range rates {
rateText := strconv.FormatInt(rate, 10)
if rate == throttler.MaxRateModuleDisabled {
rateText = "unlimited"
}
table.Append([]string{name, rateText})
}
table.Render()
wr.Logger().Printf("%d active throttler(s) on server '%v'.\n", len(rates), *server)
return nil
}

func commandThrottlerSetMaxRate(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
server := subFlags.String("server", "", "vtworker or vttablet to connect to")
if err := subFlags.Parse(args); err != nil {
return err
}
if subFlags.NArg() != 1 {
return fmt.Errorf("the <rate> argument is required for the ThrottlerSetMaxRate command")
}
var rate int64
if strings.ToLower(subFlags.Arg(0)) == "unlimited" {
rate = throttler.MaxRateModuleDisabled
} else {
var err error
rate, err = strconv.ParseInt(subFlags.Arg(0), 0, 64)
if err != nil {
return fmt.Errorf("failed to parse rate '%v' as integer value: %v", subFlags.Arg(0), err)
}
}

// Connect to the server.
ctx, cancel := context.WithTimeout(ctx, shortTimeout)
defer cancel()
client, err := throttlerclient.New(*server)
if err != nil {
return fmt.Errorf("error creating a throttler client for server '%v': %v", *server, err)
}
defer client.Close()

names, err := client.SetMaxRate(ctx, rate)
if err != nil {
return fmt.Errorf("failed to set the throttler rate on server '%v': %v", *server, err)
}

if len(names) == 0 {
wr.Logger().Printf("ThrottlerSetMaxRate did nothing because server '%v' has no active throttlers.\n", *server)
return nil
}

table := tablewriter.NewWriter(loggerWriter{wr.Logger()})
table.SetAutoFormatHeaders(false)
table.SetHeader([]string{"Name"})
for _, name := range names {
table.Append([]string{name})
}
table.Render()
wr.Logger().Printf("%d active throttler(s) on server '%v' were updated.\n", len(names), *server)
return nil
}
106 changes: 106 additions & 0 deletions go/vt/wrangler/testlib/throttler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Copyright 2016, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package testlib

import (
"fmt"
"net"
"strings"
"testing"

"google.golang.org/grpc"

"github.com/youtube/vitess/go/vt/throttler"
"github.com/youtube/vitess/go/vt/throttler/grpcthrottlerserver"
"github.com/youtube/vitess/go/vt/zktopo/zktestserver"

// The test uses the gRPC throttler client and server implementations.
_ "github.com/youtube/vitess/go/vt/throttler/grpcthrottlerclient"
)

// TestVtctlThrottlerCommands tests all vtctl commands from the
// "Resharding Throttler" group.
func TestVtctlThrottlerCommands(t *testing.T) {
// Run a throttler server using the default process throttle manager.
listener, err := net.Listen("tcp", ":0")
if err != nil {
t.Fatalf("Cannot listen: %v", err)
}
s := grpc.NewServer()
go s.Serve(listener)
grpcthrottlerserver.StartServer(s, throttler.GlobalManager)

addr := fmt.Sprintf("localhost:%v", listener.Addr().(*net.TCPAddr).Port)

ts := zktestserver.New(t, []string{"cell1", "cell2"})
vp := NewVtctlPipe(t, ts)
defer vp.Close()

// Get and set rate commands do not fail when no throttler is registered.
{
got, err := vp.RunAndOutput([]string{"ThrottlerMaxRates", "-server", addr})
if err != nil {
t.Fatalf("VtctlPipe.RunAndStreamOutput() failed: %v", err)
}
want := "no active throttlers"
if !strings.Contains(got, want) {
t.Fatalf("ThrottlerMaxRates() = %v, want substring = %v", got, want)
}
}

{
got, err := vp.RunAndOutput([]string{"ThrottlerSetMaxRate", "-server", addr, "23"})
if err != nil {
t.Fatalf("VtctlPipe.RunAndStreamOutput() failed: %v", err)
}
want := "no active throttlers"
if !strings.Contains(got, want) {
t.Fatalf("ThrottlerSetMaxRate(23) = %v, want substring = %v", got, want)
}
}

// Add a throttler and check the commands again.
t1, err := throttler.NewThrottler("t1", "TPS", 1 /* threadCount */, 2323, throttler.ReplicationLagModuleDisabled)
if err != nil {
t.Fatal(err)
}
defer t1.Close()
// MaxRates() will return the initial rate.
expectRate(t, vp, addr, "2323")

// Disable the module by setting the rate to 'unlimited'.
setRate(t, vp, addr, "unlimited")
expectRate(t, vp, addr, "unlimited")

// Re-enable it by setting a limit.
setRate(t, vp, addr, "9999")
expectRate(t, vp, addr, "9999")
}

func setRate(t *testing.T, vp *VtctlPipe, addr, rateStr string) {
got, err := vp.RunAndOutput([]string{"ThrottlerSetMaxRate", "-server", addr, rateStr})
if err != nil {
t.Fatalf("VtctlPipe.RunAndStreamOutput() failed: %v", err)
}
want := "t1"
if !strings.Contains(got, want) {
t.Fatalf("ThrottlerSetMaxRate(%v) = %v, want substring = %v", rateStr, got, want)
}
}

func expectRate(t *testing.T, vp *VtctlPipe, addr, rateStr string) {
got, err := vp.RunAndOutput([]string{"ThrottlerMaxRates", "-server", addr})
if err != nil {
t.Fatalf("VtctlPipe.RunAndStreamOutput() failed: %v", err)
}
want := "1 active throttler"
if !strings.Contains(got, want) {
t.Fatalf("ThrottlerMaxRates() = %v, want substring = %v", got, want)
}
want2 := fmt.Sprintf("| %v |", rateStr)
if !strings.Contains(got, want2) {
t.Fatalf("ThrottlerMaxRates() = %v, want substring = %v", got, want2)
}
}
19 changes: 18 additions & 1 deletion go/vt/wrangler/testlib/vtctl_pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package testlib

import (
"bytes"
"flag"
"fmt"
"io"
Expand Down Expand Up @@ -82,6 +83,22 @@ func (vp *VtctlPipe) Close() {
// Run executes the provided command remotely, logs the output in the
// test logs, and returns the command error.
func (vp *VtctlPipe) Run(args []string) error {
return vp.run(args, func(line string) {
vp.t.Log(line)
})
}

// RunAndOutput is similar to Run, but returns the output as a multi-line string
// instead of logging it.
func (vp *VtctlPipe) RunAndOutput(args []string) (string, error) {
var output bytes.Buffer
err := vp.run(args, func(line string) {
output.WriteString(line)
})
return output.String(), err
}

func (vp *VtctlPipe) run(args []string, outputFunc func(string)) error {
actionTimeout := 30 * time.Second
ctx := context.Background()

Expand All @@ -93,7 +110,7 @@ func (vp *VtctlPipe) Run(args []string) error {
le, err := stream.Recv()
switch err {
case nil:
vp.t.Logf(logutil.EventString(le))
outputFunc(logutil.EventString(le))
case io.EOF:
return nil
default:
Expand Down
Loading

0 comments on commit 5f98e05

Please sign in to comment.