Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vtctl: Added throttler commands. vtworker: Enabled throttler RPC server. #1776

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 61 additions & 1 deletion doc/vtctlReference.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ Commands are listed in the following groups:
* [Keyspaces](#keyspaces)
* [Queries](#queries)
* [Replication Graph](#replication-graph)
* [Resharding Throttler](#resharding-throttler)
* [Schema, Version, Permissions](#schema-version-permissions)
* [Serving Graph](#serving-graph)
* [Shards](#shards)
Expand Down Expand Up @@ -800,6 +801,65 @@ Outputs a JSON structure that contains information about the ShardReplication.
* The <code>&lt;cell&gt;</code> and <code>&lt;keyspace/shard&gt;</code> arguments are required for the <code>&lt;GetShardReplication&gt;</code> command. This error occurs if the command is not called with exactly 2 arguments.


## Resharding Throttler

* [ThrottlerMaxRates](#throttlermaxrates)
* [ThrottlerSetMaxRate](#throttlersetmaxrate)

### ThrottlerMaxRates

Returns the current max rate of all active resharding throttlers on the server.

#### Example

<pre class="command-example">ThrottlerMaxRates -server &lt;vtworker or vttablet&gt;</pre>

#### Flags

| Name | Type | Definition |
| :-------- | :--------- | :--------- |
| server | string | vtworker or vttablet to connect to |


#### Arguments

* <code>&lt;vtworker or vttablet&gt;</code> &ndash; Required.

#### Errors

* the ThrottlerSetMaxRate command does not accept any positional parameters This error occurs if the command is not called with exactly 0 arguments.
* error creating a throttler client for <code>&lt;server&gt;</code> '%v': %v
* failed to get the throttler rate from <code>&lt;server&gt;</code> '%v': %v


### ThrottlerSetMaxRate

Sets the max rate for all active resharding throttlers on the server.

#### Example

<pre class="command-example">ThrottlerSetMaxRate -server &lt;vtworker or vttablet&gt; &lt;rate&gt;</pre>

#### Flags

| Name | Type | Definition |
| :-------- | :--------- | :--------- |
| server | string | vtworker or vttablet to connect to |


#### Arguments

* <code>&lt;vtworker or vttablet&gt;</code> &ndash; Required.
* <code>&lt;rate&gt;</code> &ndash; Required.

#### Errors

* the <code>&lt;rate&gt;</code> argument is required for the <code>&lt;ThrottlerSetMaxRate&gt;</code> command This error occurs if the command is not called with exactly one argument.
* failed to parse rate '%v' as integer value: %v
* error creating a throttler client for <code>&lt;server&gt;</code> '%v': %v
* failed to set the throttler rate on <code>&lt;server&gt;</code> '%v': %v


## Schema, Version, Permissions

* [ApplySchema](#applyschema)
Expand Down Expand Up @@ -1512,7 +1572,7 @@ Walks through a ShardReplication object and fixes the first error that it encoun

### ShardReplicationPositions

Shows the replication status of each slave machine in the shard graph. In this case, the status refers to the replication lag between the master vttablet and the slave vttablet. In Vitess, data is always written to the master vttablet first and then replicated to all slave vttablets.
Shows the replication status of each slave machine in the shard graph. In this case, the status refers to the replication lag between the master vttablet and the slave vttablet. In Vitess, data is always written to the master vttablet first and then replicated to all slave vttablets. Output is sorted by tablet type, then replication position. Use ctrl-C to interrupt command and see partial result if needed.

#### Example

Expand Down
11 changes: 11 additions & 0 deletions go/cmd/vtctl/plugin_grpcthrottlerclient.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// Copyright 2016, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package main

// Imports and register the gRPC throttler client.

import (
_ "github.com/youtube/vitess/go/vt/throttler/grpcthrottlerclient"
)
11 changes: 11 additions & 0 deletions go/cmd/vtctld/plugin_grpcthrottlerclient.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// Copyright 2016, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package main

// Imports and register the gRPC throttler client.

import (
_ "github.com/youtube/vitess/go/vt/throttler/grpcthrottlerclient"
)
11 changes: 11 additions & 0 deletions go/cmd/vtworker/plugin_grpcthrottlerserver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// Copyright 2016, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package main

// Imports and register the gRPC throttler server.

import (
_ "github.com/youtube/vitess/go/vt/throttler/grpcthrottlerserver"
)
132 changes: 132 additions & 0 deletions go/vt/vtctl/throttler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
// Copyright 2016, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package vtctl

import (
"flag"
"fmt"
"strconv"
"strings"
"time"

"github.com/olekukonko/tablewriter"
"github.com/youtube/vitess/go/vt/throttler"
"github.com/youtube/vitess/go/vt/throttler/throttlerclient"
"github.com/youtube/vitess/go/vt/wrangler"
"golang.org/x/net/context"
)

// This file contains the commands to control the throttler which is used during
// resharding (vtworker) and by filtered replication (vttablet).

const throttlerGroupName = "Resharding Throttler"
const shortTimeout = 15 * time.Second

func init() {
addCommandGroup(throttlerGroupName)

addCommand(throttlerGroupName, command{
"ThrottlerMaxRates",
commandThrottlerMaxRates,
"-server <vtworker or vttablet>",
"Returns the current max rate of all active resharding throttlers on the server."})
addCommand(throttlerGroupName, command{
"ThrottlerSetMaxRate",
commandThrottlerSetMaxRate,
"-server <vtworker or vttablet> <rate>",
"Sets the max rate for all active resharding throttlers on the server."})
}

func commandThrottlerMaxRates(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
server := subFlags.String("server", "", "vtworker or vttablet to connect to")
if err := subFlags.Parse(args); err != nil {
return err
}
if subFlags.NArg() != 0 {
return fmt.Errorf("the ThrottlerSetMaxRate command does not accept any positional parameters")
}

// Connect to the server.
ctx, cancel := context.WithTimeout(ctx, shortTimeout)
defer cancel()
client, err := throttlerclient.New(*server)
if err != nil {
return fmt.Errorf("error creating a throttler client for server '%v': %v", *server, err)
}
defer client.Close()

rates, err := client.MaxRates(ctx)
if err != nil {
return fmt.Errorf("failed to get the throttler rate from server '%v': %v", *server, err)
}

if len(rates) == 0 {
wr.Logger().Printf("There are no active throttlers on server '%v'.\n", *server)
return nil
}

table := tablewriter.NewWriter(loggerWriter{wr.Logger()})
table.SetAutoFormatHeaders(false)
table.SetHeader([]string{"Name", "Rate"})
for name, rate := range rates {
rateText := strconv.FormatInt(rate, 10)
if rate == throttler.MaxRateModuleDisabled {
rateText = "unlimited"
}
table.Append([]string{name, rateText})
}
table.Render()
wr.Logger().Printf("%d active throttler(s) on server '%v'.\n", len(rates), *server)
return nil
}

func commandThrottlerSetMaxRate(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
server := subFlags.String("server", "", "vtworker or vttablet to connect to")
if err := subFlags.Parse(args); err != nil {
return err
}
if subFlags.NArg() != 1 {
return fmt.Errorf("the <rate> argument is required for the ThrottlerSetMaxRate command")
}
var rate int64
if strings.ToLower(subFlags.Arg(0)) == "unlimited" {
rate = throttler.MaxRateModuleDisabled
} else {
var err error
rate, err = strconv.ParseInt(subFlags.Arg(0), 0, 64)
if err != nil {
return fmt.Errorf("failed to parse rate '%v' as integer value: %v", subFlags.Arg(0), err)
}
}

// Connect to the server.
ctx, cancel := context.WithTimeout(ctx, shortTimeout)
defer cancel()
client, err := throttlerclient.New(*server)
if err != nil {
return fmt.Errorf("error creating a throttler client for server '%v': %v", *server, err)
}
defer client.Close()

names, err := client.SetMaxRate(ctx, rate)
if err != nil {
return fmt.Errorf("failed to set the throttler rate on server '%v': %v", *server, err)
}

if len(names) == 0 {
wr.Logger().Printf("ThrottlerSetMaxRate did nothing because server '%v' has no active throttlers.\n", *server)
return nil
}

table := tablewriter.NewWriter(loggerWriter{wr.Logger()})
table.SetAutoFormatHeaders(false)
table.SetHeader([]string{"Name"})
for _, name := range names {
table.Append([]string{name})
}
table.Render()
wr.Logger().Printf("%d active throttler(s) on server '%v' were updated.\n", len(names), *server)
return nil
}
106 changes: 106 additions & 0 deletions go/vt/wrangler/testlib/throttler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Copyright 2016, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package testlib

import (
"fmt"
"net"
"strings"
"testing"

"google.golang.org/grpc"

"github.com/youtube/vitess/go/vt/throttler"
"github.com/youtube/vitess/go/vt/throttler/grpcthrottlerserver"
"github.com/youtube/vitess/go/vt/zktopo/zktestserver"

// The test uses the gRPC throttler client and server implementations.
_ "github.com/youtube/vitess/go/vt/throttler/grpcthrottlerclient"
)

// TestVtctlThrottlerCommands tests all vtctl commands from the
// "Resharding Throttler" group.
func TestVtctlThrottlerCommands(t *testing.T) {
// Run a throttler server using the default process throttle manager.
listener, err := net.Listen("tcp", ":0")
if err != nil {
t.Fatalf("Cannot listen: %v", err)
}
s := grpc.NewServer()
go s.Serve(listener)
grpcthrottlerserver.StartServer(s, throttler.GlobalManager)

addr := fmt.Sprintf("localhost:%v", listener.Addr().(*net.TCPAddr).Port)

ts := zktestserver.New(t, []string{"cell1", "cell2"})
vp := NewVtctlPipe(t, ts)
defer vp.Close()

// Get and set rate commands do not fail when no throttler is registered.
{
got, err := vp.RunAndOutput([]string{"ThrottlerMaxRates", "-server", addr})
if err != nil {
t.Fatalf("VtctlPipe.RunAndStreamOutput() failed: %v", err)
}
want := "no active throttlers"
if !strings.Contains(got, want) {
t.Fatalf("ThrottlerMaxRates() = %v, want substring = %v", got, want)
}
}

{
got, err := vp.RunAndOutput([]string{"ThrottlerSetMaxRate", "-server", addr, "23"})
if err != nil {
t.Fatalf("VtctlPipe.RunAndStreamOutput() failed: %v", err)
}
want := "no active throttlers"
if !strings.Contains(got, want) {
t.Fatalf("ThrottlerSetMaxRate(23) = %v, want substring = %v", got, want)
}
}

// Add a throttler and check the commands again.
t1, err := throttler.NewThrottler("t1", "TPS", 1 /* threadCount */, 2323, throttler.ReplicationLagModuleDisabled)
if err != nil {
t.Fatal(err)
}
defer t1.Close()
// MaxRates() will return the initial rate.
expectRate(t, vp, addr, "2323")

// Disable the module by setting the rate to 'unlimited'.
setRate(t, vp, addr, "unlimited")
expectRate(t, vp, addr, "unlimited")

// Re-enable it by setting a limit.
setRate(t, vp, addr, "9999")
expectRate(t, vp, addr, "9999")
}

func setRate(t *testing.T, vp *VtctlPipe, addr, rateStr string) {
got, err := vp.RunAndOutput([]string{"ThrottlerSetMaxRate", "-server", addr, rateStr})
if err != nil {
t.Fatalf("VtctlPipe.RunAndStreamOutput() failed: %v", err)
}
want := "t1"
if !strings.Contains(got, want) {
t.Fatalf("ThrottlerSetMaxRate(%v) = %v, want substring = %v", rateStr, got, want)
}
}

func expectRate(t *testing.T, vp *VtctlPipe, addr, rateStr string) {
got, err := vp.RunAndOutput([]string{"ThrottlerMaxRates", "-server", addr})
if err != nil {
t.Fatalf("VtctlPipe.RunAndStreamOutput() failed: %v", err)
}
want := "1 active throttler"
if !strings.Contains(got, want) {
t.Fatalf("ThrottlerMaxRates() = %v, want substring = %v", got, want)
}
want2 := fmt.Sprintf("| %v |", rateStr)
if !strings.Contains(got, want2) {
t.Fatalf("ThrottlerMaxRates() = %v, want substring = %v", got, want2)
}
}
19 changes: 18 additions & 1 deletion go/vt/wrangler/testlib/vtctl_pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package testlib

import (
"bytes"
"flag"
"fmt"
"io"
Expand Down Expand Up @@ -82,6 +83,22 @@ func (vp *VtctlPipe) Close() {
// Run executes the provided command remotely, logs the output in the
// test logs, and returns the command error.
func (vp *VtctlPipe) Run(args []string) error {
return vp.run(args, func(line string) {
vp.t.Log(line)
})
}

// RunAndOutput is similar to Run, but returns the output as a multi-line string
// instead of logging it.
func (vp *VtctlPipe) RunAndOutput(args []string) (string, error) {
var output bytes.Buffer
err := vp.run(args, func(line string) {
output.WriteString(line)
})
return output.String(), err
}

func (vp *VtctlPipe) run(args []string, outputFunc func(string)) error {
actionTimeout := 30 * time.Second
ctx := context.Background()

Expand All @@ -93,7 +110,7 @@ func (vp *VtctlPipe) Run(args []string) error {
le, err := stream.Recv()
switch err {
case nil:
vp.t.Logf(logutil.EventString(le))
outputFunc(logutil.EventString(le))
case io.EOF:
return nil
default:
Expand Down
Loading