Skip to content

Commit

Permalink
Adds the core of vtshovel program
Browse files Browse the repository at this point in the history
* Adds binary to run vtshovel.
* At the moment only working in ephemeral mode (i.e no data is persisted back to
  vrsettings).
* vtshovel only works for statement based replication right now. This is due to
  now having a good way to have a schema loader. We will itereate on this.

Signed-off-by: Rafael Chacon <[email protected]>
  • Loading branch information
rafael committed Oct 11, 2019
1 parent f903605 commit c3c238b
Show file tree
Hide file tree
Showing 8 changed files with 310 additions and 46 deletions.
251 changes: 251 additions & 0 deletions go/cmd/vtshovel/vtshovel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
/*
Copyright 2017 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
"context"
"encoding/json"
"flag"
"io/ioutil"
"math/rand"
"regexp"
"strings"
"time"

"vitess.io/vitess/go/exit"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/vterrors"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
)

var (
vtShovelConfigFile = flag.String("vtshovel-config-file", "/etc/slack.d/vtshovel.json", "VTShovel Config file")
dryRun = flag.Bool("dry-run", false, "When present, only log DML that are going to be performed in target database")

autoIncr = regexp.MustCompile(` AUTO_INCREMENT=\d+`)
)

func init() {
rand.Seed(time.Now().UnixNano())
servenv.RegisterDefaultFlags()
}

// VtShovelConfig fields to configure vtshovel client
type VtShovelConfig struct {
// Source MySQL client information

// MySQLSourceHost ...
MySQLSourceHost string `json:"mysql_source_host"`
// MySQLSourcePort ...
MySQLSourcePort int `json:"mysql_source_port"`
// MySQLSourceUser ...
MySQLSourceUser string `json:"mysql_source_user"`
// MySQLSourcePassword ...
MySQLSourcePassword string `json:"mysql_source_password"`
// MySQLSourceBinlogStartPos ...
MySQLSourceBinlogStartPos string `json:"mysql_source_binlog_start_pos"`
// MySQLSourceDatabase ...
MySQLSourceDBName string `json:"mysql_source_dbname"`

// Target MySQL client information

// MySQLTargetHost ...
MySQLTargetHost string `json:"mysql_target_host"`
// MySQLTargetPort ...
MySQLTargetPort int `json:"mysql_target_port"`
// MySQLTargetUser ...
MySQLTargetUser string `json:"mysql_target_user"`
// MySQLTargetPassword ...
MySQLTargetPassword string `json:"mysql_target_password"`
// MySQLTargetDBName ...
MySQLTargetDBName string `json:"mysql_target_dbname"`
}

func main() {
defer exit.Recover()

servenv.ParseFlags("vtshovel")
servenv.Init()

servenv.OnRun(func() {
//vreplication.MySQLAddStatusPart()
// Flags are parsed now. Parse the template using the actual flag value and overwrite the current template.
//addStatusParts(vtg)
})

vtShovelConfig, err := loadConfigFromFile(*vtShovelConfigFile)
if err != nil {
log.Fatal(err)
}

targetConnParams := mysql.ConnParams{
Host: vtShovelConfig.MySQLTargetHost,
Port: vtShovelConfig.MySQLTargetPort,
Pass: vtShovelConfig.MySQLTargetPassword,
Uname: vtShovelConfig.MySQLTargetUser,
DbName: vtShovelConfig.MySQLTargetDBName,
}
dbTargetClient := newVtShovelDbClient(
binlogplayer.NewDBClient(&targetConnParams),
vtShovelConfig.MySQLSourceBinlogStartPos,
)

if err := dbTargetClient.Connect(); err != nil {
log.Fatal(vterrors.Wrap(err, "can't connect to database"))
}

sourceConnParams := mysql.ConnParams{
Host: vtShovelConfig.MySQLSourceHost,
Port: vtShovelConfig.MySQLSourcePort,
Pass: vtShovelConfig.MySQLSourcePassword,
Uname: vtShovelConfig.MySQLSourceUser,
}

servenv.OnClose(dbTargetClient.Close)

source := binlogdatapb.BinlogSource{
Filter: &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{
&binlogdatapb.Rule{
Match: "/" + vtShovelConfig.MySQLSourceDBName + ".*/",
},
},
},
}
ctx := context.Background()
sourceVstreamClient := vreplication.NewMySQLVStreamerClient(&sourceConnParams)
go func() {
replicator := vreplication.NewVReplicator(
1,
&source,
sourceVstreamClient,
binlogplayer.NewStats(),
dbTargetClient,
newVtShovelSchemaLoader(),
)
replicator.Replicate(ctx)
if err != nil {
log.Infof("Error starting stream: %v", err)

}
return
}()
servenv.RunDefault()
}

func loadConfigFromFile(file string) (*VtShovelConfig, error) {
data, err := ioutil.ReadFile(file)
if err != nil {
return nil, vterrors.Wrapf(err, "Failed to read %v file", file)
}
vtShovelConfig := &VtShovelConfig{}
err = json.Unmarshal(data, vtShovelConfig)
if err != nil {
return nil, vterrors.Wrap(err, "Error parsing auth server config")
}
return vtShovelConfig, nil
}

type vtShovelDbClient struct {
dbClient binlogplayer.DBClient
startPos string
}

type vtShovelSchemaLoader struct{}

func newVtShovelDbClient(dbClient binlogplayer.DBClient, startPos string) binlogplayer.DBClient {
return &vtShovelDbClient{
dbClient: dbClient,
startPos: startPos,
}
}

func newVtShovelSchemaLoader() vreplication.SchemasLoader {
return &vtShovelSchemaLoader{}
}

func (vdc *vtShovelDbClient) DBName() string {
return vdc.dbClient.DBName()
}

func (vdc *vtShovelDbClient) Connect() error {
return vdc.dbClient.Connect()
}

func (vdc *vtShovelDbClient) Begin() error {
return vdc.dbClient.Begin()
}

func (vdc *vtShovelDbClient) Commit() error {
return vdc.dbClient.Commit()
}

func (vdc *vtShovelDbClient) Rollback() error {
return vdc.dbClient.Rollback()
}

func (vdc *vtShovelDbClient) Close() {
vdc.dbClient.Close()
}

func (vdc *vtShovelDbClient) ExecuteFetch(query string, maxrows int) (*sqltypes.Result, error) {
if strings.Contains(query, "from _vt.copy_state") {
dummyResult := &sqltypes.Result{
Rows: [][]sqltypes.Value{
[]sqltypes.Value{
sqltypes.NewInt64(0),
},
},
}
return dummyResult, nil
}

if strings.Contains(query, "from _vt.vreplication") {
dummyResult := &sqltypes.Result{
Rows: [][]sqltypes.Value{
[]sqltypes.Value{
sqltypes.NewVarBinary(vdc.startPos),
sqltypes.NewVarBinary(""), // StopPos
sqltypes.NewInt64(10000), // maxTPS
sqltypes.NewInt64(10000), // maxReplicationLag
sqltypes.NewVarBinary("Running"), // state
},
},
}
return dummyResult, nil
}

if strings.Contains(query, "update _vt.vreplication") {
return &sqltypes.Result{}, nil
}
return vdc.dbClient.ExecuteFetch(query, maxrows)
}

func (vsl *vtShovelSchemaLoader) GetSchema(dbName string, tables, excludeTables []string, includeViews bool) (*tabletmanagerdatapb.SchemaDefinition, error) {
// TODO: This will only work for stament based replication.
return &tabletmanagerdatapb.SchemaDefinition{
TableDefinitions: []*tabletmanagerdatapb.TableDefinition{},
}, nil
}
2 changes: 1 addition & 1 deletion go/mysql/replication_position.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func EncodePosition(rp Position) string {
// to BinlogFilePos
func ParseFilePosition(s string) (rp BinlogFilePos, err error) {
if s == "" {
return rp, nil
return rp, vterrors.Errorf(vtrpc.Code_INTERNAL, "parse error: unknown file:pos format %#v", s)
}

parts := strings.SplitN(s, ":", 2)
Expand Down
15 changes: 7 additions & 8 deletions go/vt/binlog/binlogplayer/binlog_player.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,8 @@ func (blp *BinlogPlayer) applyEvents(ctx context.Context) error {
return err
}

if settings.GtidStartPos != nil {
blp.position = *settings.GtidStartPos
if !settings.GtidStartPos.IsZero() {
blp.position = settings.GtidStartPos
}
blp.stopPosition = settings.StopPos
t, err := throttler.NewThrottler(
Expand Down Expand Up @@ -525,7 +525,7 @@ type VRSettings struct {
MaxTPS int64
MaxReplicationLag int64
State string
GtidStartPos *mysql.Position
GtidStartPos mysql.Position
}

// ReadVRSettings retrieves the throttler settings for
Expand All @@ -551,18 +551,17 @@ func ReadVRSettings(dbClient DBClient, uid uint32) (VRSettings, error) {
return VRSettings{}, fmt.Errorf("failed to parse max_replication_lag column: %v", err)
}
startPos := vrRow[0].ToString()
gtidStartPos, err := mysql.DecodePosition(startPos)
if err != nil {
return VRSettings{}, fmt.Errorf("failed to parse pos column: %v", err)
}
// TODO: This will be removed when we start using filename:pos flavor and everythign will by a proper enconded mysql.Position
gtidStartPos, _ := mysql.DecodePosition(startPos)

stopPos, err := mysql.DecodePosition(vrRow[1].ToString())
if err != nil {
return VRSettings{}, fmt.Errorf("failed to parse stop_pos column: %v", err)
}

return VRSettings{
StartPos: startPos,
GtidStartPos: &gtidStartPos,
GtidStartPos: gtidStartPos,
StopPos: stopPos,
MaxTPS: maxTPS,
MaxReplicationLag: maxReplicationLag,
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletmanager/vreplication/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func (ct *controller) runBlp(ctx context.Context) (err error) {
return err
}
vsClient := NewTabletVStreamerClient(tablet)
vreplicator := newVReplicator(ct.id, &ct.source, vsClient, ct.blpStats, dbClient, ct.mysqld)
vreplicator := NewVReplicator(ct.id, &ct.source, vsClient, ct.blpStats, dbClient, ct.mysqld)
return vreplicator.Replicate(ctx)
}
return fmt.Errorf("missing source")
Expand Down
14 changes: 8 additions & 6 deletions go/vt/vttablet/tabletmanager/vreplication/vplayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ import (

type vplayer struct {
vr *vreplicator
startPos mysql.Position
startPos string
gtidStartPos mysql.Position
stopPos mysql.Position
startBinlogFilePos *mysql.BinlogFilePos
saveStop bool
Expand Down Expand Up @@ -66,8 +67,9 @@ func newVPlayer(vr *vreplicator, settings binlogplayer.VRSettings, copyState map
}
return &vplayer{
vr: vr,
startPos: *settings.GtidStartPos,
pos: *settings.GtidStartPos,
startPos: settings.StartPos,
gtidStartPos: settings.GtidStartPos,
pos: settings.GtidStartPos,
stopPos: settings.StopPos,
saveStop: saveStop,
copyState: copyState,
Expand All @@ -78,9 +80,9 @@ func newVPlayer(vr *vreplicator, settings binlogplayer.VRSettings, copyState map

// play is not resumable. If pausePos is set, play returns without updating the vreplication state.
func (vp *vplayer) play(ctx context.Context) error {
if !vp.stopPos.IsZero() && vp.startPos.AtLeast(vp.stopPos) {
if !vp.stopPos.IsZero() && vp.gtidStartPos.AtLeast(vp.stopPos) {
if vp.saveStop {
return vp.vr.setState(binlogplayer.BlpStopped, fmt.Sprintf("Stop position %v already reached: %v", vp.startPos, vp.stopPos))
return vp.vr.setState(binlogplayer.BlpStopped, fmt.Sprintf("Stop position %v already reached: %v", vp.gtidStartPos, vp.stopPos))
}
return nil
}
Expand Down Expand Up @@ -121,7 +123,7 @@ func (vp *vplayer) fetchAndApply(ctx context.Context) (err error) {

streamErr := make(chan error, 1)
go func() {
streamErr <- vp.vr.sourceVStreamer.VStream(ctx, mysql.EncodePosition(vp.startPos), vp.replicatorPlan.VStreamFilter, func(events []*binlogdatapb.VEvent) error {
streamErr <- vp.vr.sourceVStreamer.VStream(ctx, vp.startPos, vp.replicatorPlan.VStreamFilter, func(events []*binlogdatapb.VEvent) error {
return relay.Send(events)
})
}()
Expand Down
Loading

0 comments on commit c3c238b

Please sign in to comment.