From 2724ba1e6f444cb798c8652c7fe6595c510acb86 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Thu, 2 Jan 2020 11:38:12 +0100 Subject: [PATCH 1/6] Added /debug/vrlog endpoint. Initial commit, untested Signed-off-by: Rohit Nayak --- .../tabletmanager/vreplication/vplayer.go | 13 +- .../tabletmanager/vreplication/vrlog.go | 126 ++++++++++++++++++ .../tabletmanager/vreplication/vrlog_test.go | 49 +++++++ 3 files changed, 187 insertions(+), 1 deletion(-) create mode 100644 go/vt/vttablet/tabletmanager/vreplication/vrlog.go create mode 100644 go/vt/vttablet/tabletmanager/vreplication/vrlog_test.go diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go index 464a4fe46cb..9196d30b845 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go @@ -192,7 +192,10 @@ func (vp *vplayer) applyRowEvent(ctx context.Context, rowEvent *binlogdatapb.Row } for _, change := range rowEvent.RowChanges { _, err := tplan.applyChange(change, func(sql string) (*sqltypes.Result, error) { - return vp.vr.dbClient.ExecuteWithRetry(ctx, sql) + stats := NewVrLogStats(ctx, "RowChange") + result, err := vp.vr.dbClient.ExecuteWithRetry(ctx, sql) + stats.Record(sql) + return result, err }) if err != nil { return err @@ -307,6 +310,7 @@ func hasAnotherCommit(items [][]*binlogdatapb.VEvent, i, j int) bool { } func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, mustSave bool) error { + stats := NewVrLogStats(ctx, event.String()) switch event.Type { case binlogdatapb.VEventType_GTID: pos, err := mysql.DecodePosition(event.Gtid) @@ -352,6 +356,8 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m return err } vp.tablePlans[event.FieldEvent.TableName] = tplan + stats.Record(fmt.Sprintf("%v", event.FieldEvent)) + case binlogdatapb.VEventType_INSERT, binlogdatapb.VEventType_DELETE, binlogdatapb.VEventType_UPDATE, binlogdatapb.VEventType_REPLACE: // This is a player using stament based replication if err := vp.vr.dbClient.Begin(); err != nil { @@ -361,6 +367,7 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m if err := vp.applyStmtEvent(ctx, event); err != nil { return err } + stats.Record(fmt.Sprintf(event.Dml)) case binlogdatapb.VEventType_ROW: // This player is configured for row based replication if err := vp.vr.dbClient.Begin(); err != nil { @@ -369,6 +376,7 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m if err := vp.applyRowEvent(ctx, event.RowEvent); err != nil { return err } + stats.Record(fmt.Sprintf("%v", event.RowEvent)) case binlogdatapb.VEventType_OTHER: if vp.vr.dbClient.InTransaction { // Unreachable @@ -417,6 +425,7 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m if _, err := vp.vr.dbClient.ExecuteWithRetry(ctx, event.Ddl); err != nil { return err } + stats.Record(fmt.Sprintf("%v", event.Ddl)) posReached, err := vp.updatePos(event.Timestamp) if err != nil { return err @@ -428,6 +437,7 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m if _, err := vp.vr.dbClient.ExecuteWithRetry(ctx, event.Ddl); err != nil { log.Infof("Ignoring error: %v for DDL: %s", err, event.Ddl) } + stats.Record(fmt.Sprintf("%v", event.Ddl)) posReached, err := vp.updatePos(event.Timestamp) if err != nil { return err @@ -481,6 +491,7 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m } return io.EOF } + stats.Record(fmt.Sprintf("%v", event.Journal)) return io.EOF case binlogdatapb.VEventType_HEARTBEAT: // No-op: heartbeat timings are calculated in outer loop. diff --git a/go/vt/vttablet/tabletmanager/vreplication/vrlog.go b/go/vt/vttablet/tabletmanager/vreplication/vrlog.go new file mode 100644 index 00000000000..6993230c8c2 --- /dev/null +++ b/go/vt/vttablet/tabletmanager/vreplication/vrlog.go @@ -0,0 +1,126 @@ +/* +Copyright 2019 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vreplication + +import ( + "html/template" + "net/http" + "strconv" + "time" + + "golang.org/x/net/context" + "vitess.io/vitess/go/streamlog" + "vitess.io/vitess/go/vt/log" +) + +var ( + vrLogStatsLogger = streamlog.New("VReplication", 50) + vrLogStatsTemplate = template.Must(template.New("vrlog").Parse(` + {{.Type}}\t + {{.Detail}}\t + {{.Time}}\t + {{.DurationNs}}\n + `)) +) + +type VrLogStats struct { + Ctx context.Context + Type string + Detail string + Time time.Time + DurationNs int64 +} + +func (stats *VrLogStats) Send() { + vrLogStatsLogger.Send(stats) +} + +func (stats *VrLogStats) Record(detail string) { + if stats.Ctx == nil || stats.Time.IsZero() { + log.Error("VrLogStats not initialized") + } + stats.Detail = detail + stats.DurationNs = time.Since(stats.Time).Nanoseconds() + stats.Send() +} + +func NewVrLogStats(ctx context.Context, eventType string) *VrLogStats { + return &VrLogStats{Ctx: ctx, Type: eventType, Time: time.Now()} +} + +func init() { + http.HandleFunc("/debug/vrlog", func(w http.ResponseWriter, r *http.Request) { + ch := vrLogStatsLogger.Subscribe("vrlogstats") + defer vrLogStatsLogger.Unsubscribe(ch) + vrlogStatsHandler(ch, w, r) + }) +} + +func vrlogStatsHandler(ch chan interface{}, w http.ResponseWriter, r *http.Request) { + timeout, limit := parseTimeoutLimitParams(r) + + tmr := time.NewTimer(timeout) + defer tmr.Stop() + for i := 0; i < limit; i++ { + select { + case out := <-ch: + select { + case <-tmr.C: + return + default: + } + stats, ok := out.(*VrLogStats) + if !ok { + log.Error("Log received is not of type VrLogStats") + continue + } + if err := vrLogStatsTemplate.Execute(w, stats); err != nil { + log.Errorf("vrlog: couldn't execute template: %v", err) + } + if f, ok := w.(http.Flusher); ok { + f.Flush() + } + case <-tmr.C: + return + } + } +} + +func parseTimeoutLimitParams(req *http.Request) (time.Duration, int) { + timeout := 1000 + limit := 300 + if ts, ok := req.URL.Query()["timeout"]; ok { + if t, err := strconv.Atoi(ts[0]); err == nil { + timeout = adjustValue(t, 0, 60) + } + } + if l, ok := req.URL.Query()["limit"]; ok { + if lim, err := strconv.Atoi(l[0]); err == nil { + limit = adjustValue(lim, 1, 200000) + } + } + return time.Duration(timeout) * time.Second, limit +} + +func adjustValue(val int, lower int, upper int) int { + if val < lower { + return lower + } else if val > upper { + return upper + } + return val +} diff --git a/go/vt/vttablet/tabletmanager/vreplication/vrlog_test.go b/go/vt/vttablet/tabletmanager/vreplication/vrlog_test.go new file mode 100644 index 00000000000..479c8c46f51 --- /dev/null +++ b/go/vt/vttablet/tabletmanager/vreplication/vrlog_test.go @@ -0,0 +1,49 @@ +/* +Copyright 2019 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vreplication + +import ( + "context" + "fmt" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" +) + +func TestVrLog(t *testing.T) { + r, _ := http.NewRequest("GET", "/debug/vrlog?timeout=10&limit=1", nil) + w := httptest.NewRecorder() + + ch := vrLogStatsLogger.Subscribe("vrlog") + defer vrLogStatsLogger.Unsubscribe(ch) + go func() { + vrlogStatsHandler(ch, w, r) + }() + stats := VrLogStats{Ctx: context.Background()} + msg := "test msg" + stats.Detail = msg + stats.Send() + time.Sleep(1 * time.Millisecond) + s := w.Body.String() + + if !strings.Contains(s, msg) { //we use Contains since the printed log is in html and also prepends current time + t.Fatalf(fmt.Sprintf("want %s, got %s", msg, s)) + } + +} From cab5d7722688bb1cdf2816eb6b4580ca908bce47 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Fri, 3 Jan 2020 10:54:23 +0100 Subject: [PATCH 2/6] Improved test case. Modified template Signed-off-by: Rohit Nayak --- .../tabletmanager/vreplication/vplayer.go | 4 +- .../tabletmanager/vreplication/vrlog.go | 24 ++++++------ .../tabletmanager/vreplication/vrlog_test.go | 37 +++++++++++++++---- 3 files changed, 42 insertions(+), 23 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go index 9196d30b845..1c89bc93e75 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go @@ -192,7 +192,7 @@ func (vp *vplayer) applyRowEvent(ctx context.Context, rowEvent *binlogdatapb.Row } for _, change := range rowEvent.RowChanges { _, err := tplan.applyChange(change, func(sql string) (*sqltypes.Result, error) { - stats := NewVrLogStats(ctx, "RowChange") + stats := Fr(ctx, "ROWCHANGE") result, err := vp.vr.dbClient.ExecuteWithRetry(ctx, sql) stats.Record(sql) return result, err @@ -310,7 +310,7 @@ func hasAnotherCommit(items [][]*binlogdatapb.VEvent, i, j int) bool { } func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, mustSave bool) error { - stats := NewVrLogStats(ctx, event.String()) + stats := NewVrLogStats(ctx, event.Type.String()) switch event.Type { case binlogdatapb.VEventType_GTID: pos, err := mysql.DecodePosition(event.Gtid) diff --git a/go/vt/vttablet/tabletmanager/vreplication/vrlog.go b/go/vt/vttablet/tabletmanager/vreplication/vrlog.go index 6993230c8c2..f754ffdc801 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vrlog.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vrlog.go @@ -17,9 +17,9 @@ limitations under the License. package vreplication import ( - "html/template" "net/http" "strconv" + "text/template" "time" "golang.org/x/net/context" @@ -29,12 +29,8 @@ import ( var ( vrLogStatsLogger = streamlog.New("VReplication", 50) - vrLogStatsTemplate = template.Must(template.New("vrlog").Parse(` - {{.Type}}\t - {{.Detail}}\t - {{.Time}}\t - {{.DurationNs}}\n - `)) + vrLogStatsTemplate = template.Must(template.New("vrlog"). + Parse("{{.Type}} Event {{.Detail}} {{.Time}} {{.DurationNs}}\n")) ) type VrLogStats struct { @@ -45,21 +41,23 @@ type VrLogStats struct { DurationNs int64 } +func NewVrLogStats(ctx context.Context, eventType string) *VrLogStats { + return &VrLogStats{Ctx: ctx, Type: eventType, Time: time.Now()} +} + func (stats *VrLogStats) Send() { vrLogStatsLogger.Send(stats) } -func (stats *VrLogStats) Record(detail string) { +func (stats *VrLogStats) Record(detail string) bool { if stats.Ctx == nil || stats.Time.IsZero() { - log.Error("VrLogStats not initialized") + log.Error("VrLogStats not initialized for %s", detail) + return false } stats.Detail = detail stats.DurationNs = time.Since(stats.Time).Nanoseconds() stats.Send() -} - -func NewVrLogStats(ctx context.Context, eventType string) *VrLogStats { - return &VrLogStats{Ctx: ctx, Type: eventType, Time: time.Now()} + return true } func init() { diff --git a/go/vt/vttablet/tabletmanager/vreplication/vrlog_test.go b/go/vt/vttablet/tabletmanager/vreplication/vrlog_test.go index 479c8c46f51..0c23b089f2a 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vrlog_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vrlog_test.go @@ -21,6 +21,7 @@ import ( "fmt" "net/http" "net/http/httptest" + "strconv" "strings" "testing" "time" @@ -35,15 +36,35 @@ func TestVrLog(t *testing.T) { go func() { vrlogStatsHandler(ch, w, r) }() - stats := VrLogStats{Ctx: context.Background()} - msg := "test msg" - stats.Detail = msg - stats.Send() + ctx := context.Background() + eventType, detail := "Test", "detail 1" + stats := NewVrLogStats(ctx, eventType) + time.Sleep(1 * time.Millisecond) + stats.Record(detail) time.Sleep(1 * time.Millisecond) s := w.Body.String() - - if !strings.Contains(s, msg) { //we use Contains since the printed log is in html and also prepends current time - t.Fatalf(fmt.Sprintf("want %s, got %s", msg, s)) + want := fmt.Sprintf("%s Event %s", eventType, detail) + if !strings.Contains(s, want) { + t.Fatalf(fmt.Sprintf("want %s, got %s", want, s)) + } + if strings.HasSuffix(s, "\\n") { + t.Fatalf("does not end in a newline: %s", s) + } + s = strings.TrimSuffix(s, "\n") + ss := strings.Split(s, " ") + numCols := 4 + if ss == nil || len(ss) != numCols { + t.Fatalf("log line should have %d columns, not %d, : %s", numCols, len(ss), strings.Join(ss, "|")) + } + lastColValue, err := strconv.Atoi(ss[len(ss)-1]) + if err != nil { + t.Fatalf("Duration is not an integer: %s", err) + } + if lastColValue < 1<<9 { + t.Fatalf("Waited 1 Millisecond, so duration should be greater than that: %d, %s", lastColValue, ss[len(ss)-1]) + } + stats = &VrLogStats{} + if stats.Record("should error out since stats is not initalized") != false { + t.Fatalf("Uninitialized stats should not log") } - } From 9c6bb47b5d308326a3ac8fec6df3c937b0d42a32 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Sat, 4 Jan 2020 15:46:07 +0100 Subject: [PATCH 3/6] Test cases for debug log. Log stream template changed Signed-off-by: Rohit Nayak --- go.sum | 1 + .../vreplication/framework_test.go | 49 +++++++++++++++++++ .../tabletmanager/vreplication/vplayer.go | 2 +- .../vreplication/vplayer_test.go | 25 ++++++++++ .../tabletmanager/vreplication/vrlog.go | 13 +++-- 5 files changed, 84 insertions(+), 6 deletions(-) diff --git a/go.sum b/go.sum index d5373e2df2e..4b7e97903c3 100644 --- a/go.sum +++ b/go.sum @@ -456,6 +456,7 @@ github.com/mitchellh/copystructure v0.0.0-20160804032330-cdac8253d00f/go.mod h1: github.com/mitchellh/go-homedir v1.0.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= +github.com/mitchellh/go-ps v0.0.0-20190716172923-621e5597135b h1:9+ke9YJ9KGWw5ANXK6ozjoK47uI3uNbXv4YVINBnGm8= github.com/mitchellh/go-ps v0.0.0-20190716172923-621e5597135b/go.mod h1:r1VsdOzOPt1ZSrGZWFoNhsAedKnEd6r9Np1+5blZCWk= github.com/mitchellh/go-testing-interface v1.0.0 h1:fzU/JVNcaqHQEcVFAKeR41fkiLdIPrefOvVG1VZ96U0= github.com/mitchellh/go-testing-interface v1.0.0/go.mod h1:kRemZodwjscx+RGhAo8eIhFbs2+BFgRtFPeD/KE+zxI= diff --git a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go index 08c2b40465e..6f23c36dd31 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go @@ -53,8 +53,14 @@ var ( globalFBC = &fakeBinlogClient{} vrepldb = "vrepl" globalDBQueries = make(chan string, 1000) + globalLogs = make(chan interface{}, 1000) ) +type LogExpectation struct { + Type string + Detail string +} + func init() { tabletconn.RegisterDialer("test", func(tablet *topodatapb.Tablet, failFast grpcclient.FailFast) (queryservice.QueryService, error) { return &fakeTabletConn{ @@ -391,6 +397,49 @@ func expectDeleteQueries(t *testing.T) { }) } +func listenToLogs() <-chan interface{} { + globalLogs = vrLogStatsLogger.Subscribe("vrlogstats") + return globalLogs +} + +func expectLogs(t *testing.T, logs []LogExpectation) { + t.Helper() + failed := false + for i, log := range logs { + if failed { + t.Errorf("no logs received") + continue + } + select { + case data := <-globalLogs: + got, ok := data.(*VrLogStats) + if !ok { + t.Errorf("got not ok casting to VrLogStats: %v", data) + } + var match bool + match = (log.Type == got.Type) + if match { + if log.Detail[0] == '/' { + result, err := regexp.MatchString(log.Detail[1:], got.Detail) + if err != nil { + panic(err) + } + match = result + } else { + match = (got.Detail == log.Detail) + } + } + + if !match { + t.Errorf("log:\n%q, does not match log %d:\n%q", got, i, log) + } + case <-time.After(5 * time.Second): + t.Errorf("no logs received, expecting %s", log) + failed = true + } + } +} + func expectDBClientQueries(t *testing.T, queries []string) { t.Helper() failed := false diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go index 1c89bc93e75..d9f5d342d9e 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go @@ -192,7 +192,7 @@ func (vp *vplayer) applyRowEvent(ctx context.Context, rowEvent *binlogdatapb.Row } for _, change := range rowEvent.RowChanges { _, err := tplan.applyChange(change, func(sql string) (*sqltypes.Result, error) { - stats := Fr(ctx, "ROWCHANGE") + stats := NewVrLogStats(ctx, "ROWCHANGE") result, err := vp.vr.dbClient.ExecuteWithRetry(ctx, sql) stats.Record(sql) return result, err diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer_test.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer_test.go index ab876697fa0..18aaa934aa8 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer_test.go @@ -173,6 +173,7 @@ func TestPlayerFilters(t *testing.T) { Filter: filter, OnDdl: binlogdatapb.OnDDLAction_IGNORE, } + cancel, _ := startVReplication(t, bls, "") defer cancel() @@ -181,6 +182,7 @@ func TestPlayerFilters(t *testing.T) { output []string table string data [][]string + logs []LogExpectation }{{ // insert with insertNormal input: "insert into src1 values(1, 'aaa')", @@ -194,6 +196,11 @@ func TestPlayerFilters(t *testing.T) { data: [][]string{ {"1", "aaa"}, }, + logs: []LogExpectation{ + {"FIELD", "/src1.*id.*INT32.*val.*VARBINARY.*"}, + {"ROWCHANGE", "insert into dst1(id,val) values (1,'aaa')"}, + {"ROW", "/src1.*3.*1aaa.*"}, + }, }, { // update with insertNormal input: "update src1 set val='bbb'", @@ -207,6 +214,10 @@ func TestPlayerFilters(t *testing.T) { data: [][]string{ {"1", "bbb"}, }, + logs: []LogExpectation{ + {"ROWCHANGE", "update dst1 set val='bbb' where id=1"}, + {"ROW", "/src1.*3.*1aaa.*"}, + }, }, { // delete with insertNormal input: "delete from src1 where id=1", @@ -218,6 +229,10 @@ func TestPlayerFilters(t *testing.T) { }, table: "dst1", data: [][]string{}, + logs: []LogExpectation{ + {"ROWCHANGE", "delete from dst1 where id=1"}, + {"ROW", "/src1.*3.*1bbb.*"}, + }, }, { // insert with insertOnDup input: "insert into src2 values(1, 2, 3)", @@ -231,6 +246,10 @@ func TestPlayerFilters(t *testing.T) { data: [][]string{ {"1", "2", "3", "1"}, }, + logs: []LogExpectation{ + {"FIELD", "/src2.*id.*val1.*val2.*"}, + {"ROWCHANGE", "insert into dst2(id,val1,sval2,rcount) values (1,2,ifnull(3, 0),1) on duplicate key update val1=values(val1), sval2=sval2+ifnull(values(sval2), 0), rcount=rcount+1"}, + }, }, { // update with insertOnDup input: "update src2 set val1=5, val2=1 where id=1", @@ -244,6 +263,10 @@ func TestPlayerFilters(t *testing.T) { data: [][]string{ {"1", "5", "1", "1"}, }, + logs: []LogExpectation{ + {"ROW", "/src2.*123.*"}, + {"ROWCHANGE", "update dst2 set val1=5, sval2=sval2-ifnull(3, 0)+ifnull(1, 0), rcount=rcount where id=1"}, + }, }, { // delete with insertOnDup input: "delete from src2 where id=1", @@ -365,9 +388,11 @@ func TestPlayerFilters(t *testing.T) { data: [][]string{}, }} + listenToLogs() for _, tcases := range testcases { execStatements(t, []string{tcases.input}) expectDBClientQueries(t, tcases.output) + expectLogs(t, tcases.logs) if tcases.table != "" { expectData(t, tcases.table, tcases.data) } diff --git a/go/vt/vttablet/tabletmanager/vreplication/vrlog.go b/go/vt/vttablet/tabletmanager/vreplication/vrlog.go index f754ffdc801..cda0a1d850d 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vrlog.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vrlog.go @@ -30,19 +30,22 @@ import ( var ( vrLogStatsLogger = streamlog.New("VReplication", 50) vrLogStatsTemplate = template.Must(template.New("vrlog"). - Parse("{{.Type}} Event {{.Detail}} {{.Time}} {{.DurationNs}}\n")) + Parse("{{.Type}} Event {{.Detail}} {{.LogTime}} {{.DurationNs}}\n")) ) type VrLogStats struct { Ctx context.Context Type string Detail string - Time time.Time + StartTime time.Time + LogTime string DurationNs int64 } func NewVrLogStats(ctx context.Context, eventType string) *VrLogStats { - return &VrLogStats{Ctx: ctx, Type: eventType, Time: time.Now()} + stats := &VrLogStats{Ctx: ctx, Type: eventType, StartTime: time.Now()} + stats.LogTime = stats.StartTime.Format("2006-01-02T15:04:05") + return stats } func (stats *VrLogStats) Send() { @@ -50,12 +53,12 @@ func (stats *VrLogStats) Send() { } func (stats *VrLogStats) Record(detail string) bool { - if stats.Ctx == nil || stats.Time.IsZero() { + if stats.Ctx == nil || stats.StartTime.IsZero() { log.Error("VrLogStats not initialized for %s", detail) return false } stats.Detail = detail - stats.DurationNs = time.Since(stats.Time).Nanoseconds() + stats.DurationNs = time.Since(stats.StartTime).Nanoseconds() stats.Send() return true } From bdae6b26c66eea60ca7adef53ffda9665b0ce89d Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Sat, 4 Jan 2020 18:34:23 +0100 Subject: [PATCH 4/6] Code cleanup Signed-off-by: Rohit Nayak --- .../vreplication/framework_test.go | 3 +-- .../tabletmanager/vreplication/vplayer.go | 14 +++++++------- .../tabletmanager/vreplication/vrlog.go | 19 ++++++++++--------- .../tabletmanager/vreplication/vrlog_test.go | 11 +++++++---- 4 files changed, 25 insertions(+), 22 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go index 6f23c36dd31..5c86ef3cf6c 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go @@ -397,9 +397,8 @@ func expectDeleteQueries(t *testing.T) { }) } -func listenToLogs() <-chan interface{} { +func listenToLogs() { globalLogs = vrLogStatsLogger.Subscribe("vrlogstats") - return globalLogs } func expectLogs(t *testing.T, logs []LogExpectation) { diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go index d9f5d342d9e..93dc3fd4fae 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go @@ -194,7 +194,7 @@ func (vp *vplayer) applyRowEvent(ctx context.Context, rowEvent *binlogdatapb.Row _, err := tplan.applyChange(change, func(sql string) (*sqltypes.Result, error) { stats := NewVrLogStats(ctx, "ROWCHANGE") result, err := vp.vr.dbClient.ExecuteWithRetry(ctx, sql) - stats.Record(sql) + stats.Send(sql) return result, err }) if err != nil { @@ -356,7 +356,7 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m return err } vp.tablePlans[event.FieldEvent.TableName] = tplan - stats.Record(fmt.Sprintf("%v", event.FieldEvent)) + stats.Send(fmt.Sprintf("%v", event.FieldEvent)) case binlogdatapb.VEventType_INSERT, binlogdatapb.VEventType_DELETE, binlogdatapb.VEventType_UPDATE, binlogdatapb.VEventType_REPLACE: // This is a player using stament based replication @@ -367,7 +367,7 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m if err := vp.applyStmtEvent(ctx, event); err != nil { return err } - stats.Record(fmt.Sprintf(event.Dml)) + stats.Send(fmt.Sprintf(event.Dml)) case binlogdatapb.VEventType_ROW: // This player is configured for row based replication if err := vp.vr.dbClient.Begin(); err != nil { @@ -376,7 +376,7 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m if err := vp.applyRowEvent(ctx, event.RowEvent); err != nil { return err } - stats.Record(fmt.Sprintf("%v", event.RowEvent)) + stats.Send(fmt.Sprintf("%v", event.RowEvent)) case binlogdatapb.VEventType_OTHER: if vp.vr.dbClient.InTransaction { // Unreachable @@ -425,7 +425,7 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m if _, err := vp.vr.dbClient.ExecuteWithRetry(ctx, event.Ddl); err != nil { return err } - stats.Record(fmt.Sprintf("%v", event.Ddl)) + stats.Send(fmt.Sprintf("%v", event.Ddl)) posReached, err := vp.updatePos(event.Timestamp) if err != nil { return err @@ -437,7 +437,7 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m if _, err := vp.vr.dbClient.ExecuteWithRetry(ctx, event.Ddl); err != nil { log.Infof("Ignoring error: %v for DDL: %s", err, event.Ddl) } - stats.Record(fmt.Sprintf("%v", event.Ddl)) + stats.Send(fmt.Sprintf("%v", event.Ddl)) posReached, err := vp.updatePos(event.Timestamp) if err != nil { return err @@ -491,7 +491,7 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m } return io.EOF } - stats.Record(fmt.Sprintf("%v", event.Journal)) + stats.Send(fmt.Sprintf("%v", event.Journal)) return io.EOF case binlogdatapb.VEventType_HEARTBEAT: // No-op: heartbeat timings are calculated in outer loop. diff --git a/go/vt/vttablet/tabletmanager/vreplication/vrlog.go b/go/vt/vttablet/tabletmanager/vreplication/vrlog.go index cda0a1d850d..9c38e4bd878 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vrlog.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vrlog.go @@ -14,6 +14,10 @@ See the License for the specific language governing permissions and limitations under the License. */ +/* + * A human readable streaming log of vreplication events for vttablets available at /debug/vrlog + */ + package vreplication import ( @@ -33,6 +37,7 @@ var ( Parse("{{.Type}} Event {{.Detail}} {{.LogTime}} {{.DurationNs}}\n")) ) +//VrLogStats collects attributes of a vreplication event for logging type VrLogStats struct { Ctx context.Context Type string @@ -42,25 +47,21 @@ type VrLogStats struct { DurationNs int64 } +//NewVrLogStats should be called at the start of the event to be logged func NewVrLogStats(ctx context.Context, eventType string) *VrLogStats { stats := &VrLogStats{Ctx: ctx, Type: eventType, StartTime: time.Now()} stats.LogTime = stats.StartTime.Format("2006-01-02T15:04:05") return stats } -func (stats *VrLogStats) Send() { - vrLogStatsLogger.Send(stats) -} - -func (stats *VrLogStats) Record(detail string) bool { +//Send records the log event, should be called on a stats object constructed by NewVrLogStats() +func (stats *VrLogStats) Send(detail string) { if stats.Ctx == nil || stats.StartTime.IsZero() { - log.Error("VrLogStats not initialized for %s", detail) - return false + panic("VrLogStats not initialized " + detail) } stats.Detail = detail stats.DurationNs = time.Since(stats.StartTime).Nanoseconds() - stats.Send() - return true + vrLogStatsLogger.Send(stats) } func init() { diff --git a/go/vt/vttablet/tabletmanager/vreplication/vrlog_test.go b/go/vt/vttablet/tabletmanager/vreplication/vrlog_test.go index 0c23b089f2a..70bdf5563cf 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vrlog_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vrlog_test.go @@ -40,7 +40,7 @@ func TestVrLog(t *testing.T) { eventType, detail := "Test", "detail 1" stats := NewVrLogStats(ctx, eventType) time.Sleep(1 * time.Millisecond) - stats.Record(detail) + stats.Send(detail) time.Sleep(1 * time.Millisecond) s := w.Body.String() want := fmt.Sprintf("%s Event %s", eventType, detail) @@ -63,8 +63,11 @@ func TestVrLog(t *testing.T) { if lastColValue < 1<<9 { t.Fatalf("Waited 1 Millisecond, so duration should be greater than that: %d, %s", lastColValue, ss[len(ss)-1]) } + defer func() { + if err := recover(); err == nil { + t.Fatalf("Uninitialized stats should not log") + } + }() stats = &VrLogStats{} - if stats.Record("should error out since stats is not initalized") != false { - t.Fatalf("Uninitialized stats should not log") - } + stats.Send("should error out since stats is not initalized") } From 7117000342de0fdf58b6c2199952fa1b459b7381 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Mon, 6 Jan 2020 16:39:09 +0100 Subject: [PATCH 5/6] Addressed review comments. Added http test class to handle races Signed-off-by: Rohit Nayak --- .../vreplication/framework_test.go | 10 +--- .../vreplication/http_stream_mock.go | 59 +++++++++++++++++++ .../tabletmanager/vreplication/vplayer.go | 5 +- .../vreplication/vplayer_test.go | 20 ++++--- .../tabletmanager/vreplication/vrlog.go | 15 ++--- .../tabletmanager/vreplication/vrlog_test.go | 53 +++++++++++------ 6 files changed, 117 insertions(+), 45 deletions(-) create mode 100644 go/vt/vttablet/tabletmanager/vreplication/http_stream_mock.go diff --git a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go index 5c86ef3cf6c..c51a5881545 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go @@ -53,7 +53,6 @@ var ( globalFBC = &fakeBinlogClient{} vrepldb = "vrepl" globalDBQueries = make(chan string, 1000) - globalLogs = make(chan interface{}, 1000) ) type LogExpectation struct { @@ -397,11 +396,8 @@ func expectDeleteQueries(t *testing.T) { }) } -func listenToLogs() { - globalLogs = vrLogStatsLogger.Subscribe("vrlogstats") -} - -func expectLogs(t *testing.T, logs []LogExpectation) { +func expectLogsAndUnsubscribe(t *testing.T, logs []LogExpectation, logCh chan interface{}) { + defer vrLogStatsLogger.Unsubscribe(logCh) t.Helper() failed := false for i, log := range logs { @@ -410,7 +406,7 @@ func expectLogs(t *testing.T, logs []LogExpectation) { continue } select { - case data := <-globalLogs: + case data := <-logCh: got, ok := data.(*VrLogStats) if !ok { t.Errorf("got not ok casting to VrLogStats: %v", data) diff --git a/go/vt/vttablet/tabletmanager/vreplication/http_stream_mock.go b/go/vt/vttablet/tabletmanager/vreplication/http_stream_mock.go new file mode 100644 index 00000000000..8b640969e5d --- /dev/null +++ b/go/vt/vttablet/tabletmanager/vreplication/http_stream_mock.go @@ -0,0 +1,59 @@ +/* +Copyright 2019 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +/* + * To be used instead of HttpTest in cases where a test needs to do synchronized sequential reads are required from the + * http stream. Note that this handles only one write at a time: the test should read the written data before the next + * write. Data is sent to the channel only on a Flush. + */ + +package vreplication + +import ( + "net/http" +) + +//HTTPStreamWriterMock implements http.ResponseWriter and adds a channel to sync writes and reads +type HTTPStreamWriterMock struct { + ch chan interface{} + data []byte +} + +//NewHTTPStreamWriterMock returns a new HTTPStreamWriterMock +func NewHTTPStreamWriterMock() *HTTPStreamWriterMock { + return &HTTPStreamWriterMock{ch: make(chan interface{}, 1), data: make([]byte, 0)} +} + +//Header is a stub +func (w *HTTPStreamWriterMock) Header() http.Header { + return nil +} + +//WriteHeader is a stub +func (w *HTTPStreamWriterMock) WriteHeader(statuscode int) { +} + +//Write buffers sent data +func (w *HTTPStreamWriterMock) Write(data []byte) (int, error) { + w.data = append(w.data, data...) + return 0, nil +} + +//Flush sends buffered data to the channel +func (w *HTTPStreamWriterMock) Flush() { + w.ch <- w.data + w.data = w.data[:0] +} diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go index 93dc3fd4fae..85159a8bfb8 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go @@ -192,7 +192,7 @@ func (vp *vplayer) applyRowEvent(ctx context.Context, rowEvent *binlogdatapb.Row } for _, change := range rowEvent.RowChanges { _, err := tplan.applyChange(change, func(sql string) (*sqltypes.Result, error) { - stats := NewVrLogStats(ctx, "ROWCHANGE") + stats := NewVrLogStats("ROWCHANGE") result, err := vp.vr.dbClient.ExecuteWithRetry(ctx, sql) stats.Send(sql) return result, err @@ -310,7 +310,7 @@ func hasAnotherCommit(items [][]*binlogdatapb.VEvent, i, j int) bool { } func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, mustSave bool) error { - stats := NewVrLogStats(ctx, event.Type.String()) + stats := NewVrLogStats(event.Type.String()) switch event.Type { case binlogdatapb.VEventType_GTID: pos, err := mysql.DecodePosition(event.Gtid) @@ -376,6 +376,7 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m if err := vp.applyRowEvent(ctx, event.RowEvent); err != nil { return err } + //Row event is logged AFTER RowChanges are applied so as to calculate the total elapsed time for the Row event stats.Send(fmt.Sprintf("%v", event.RowEvent)) case binlogdatapb.VEventType_OTHER: if vp.vr.dbClient.InTransaction { diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer_test.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer_test.go index 18aaa934aa8..6bb77ecfa1c 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer_test.go @@ -182,7 +182,7 @@ func TestPlayerFilters(t *testing.T) { output []string table string data [][]string - logs []LogExpectation + logs []LogExpectation //logs defined only for a few testcases since they are enough to test all log events }{{ // insert with insertNormal input: "insert into src1 values(1, 'aaa')", @@ -264,8 +264,8 @@ func TestPlayerFilters(t *testing.T) { {"1", "5", "1", "1"}, }, logs: []LogExpectation{ - {"ROW", "/src2.*123.*"}, {"ROWCHANGE", "update dst2 set val1=5, sval2=sval2-ifnull(3, 0)+ifnull(1, 0), rcount=rcount where id=1"}, + {"ROW", "/src2.*123.*"}, }, }, { // delete with insertOnDup @@ -388,13 +388,15 @@ func TestPlayerFilters(t *testing.T) { data: [][]string{}, }} - listenToLogs() - for _, tcases := range testcases { - execStatements(t, []string{tcases.input}) - expectDBClientQueries(t, tcases.output) - expectLogs(t, tcases.logs) - if tcases.table != "" { - expectData(t, tcases.table, tcases.data) + for _, tcase := range testcases { + if tcase.logs != nil { + logch := vrLogStatsLogger.Subscribe("vrlogstats") + defer expectLogsAndUnsubscribe(t, tcase.logs, logch) + } + execStatements(t, []string{tcase.input}) + expectDBClientQueries(t, tcase.output) + if tcase.table != "" { + expectData(t, tcase.table, tcase.data) } } } diff --git a/go/vt/vttablet/tabletmanager/vreplication/vrlog.go b/go/vt/vttablet/tabletmanager/vreplication/vrlog.go index 9c38e4bd878..5fb75d6f7d2 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vrlog.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vrlog.go @@ -26,7 +26,6 @@ import ( "text/template" "time" - "golang.org/x/net/context" "vitess.io/vitess/go/streamlog" "vitess.io/vitess/go/vt/log" ) @@ -39,7 +38,6 @@ var ( //VrLogStats collects attributes of a vreplication event for logging type VrLogStats struct { - Ctx context.Context Type string Detail string StartTime time.Time @@ -48,17 +46,17 @@ type VrLogStats struct { } //NewVrLogStats should be called at the start of the event to be logged -func NewVrLogStats(ctx context.Context, eventType string) *VrLogStats { - stats := &VrLogStats{Ctx: ctx, Type: eventType, StartTime: time.Now()} - stats.LogTime = stats.StartTime.Format("2006-01-02T15:04:05") - return stats +func NewVrLogStats(eventType string) *VrLogStats { + return &VrLogStats{Type: eventType, StartTime: time.Now()} } //Send records the log event, should be called on a stats object constructed by NewVrLogStats() func (stats *VrLogStats) Send(detail string) { - if stats.Ctx == nil || stats.StartTime.IsZero() { - panic("VrLogStats not initialized " + detail) + if stats.StartTime.IsZero() { + stats.Type = "Error: Type not specified" + stats.StartTime = time.Now() } + stats.LogTime = stats.StartTime.Format("2006-01-02T15:04:05") stats.Detail = detail stats.DurationNs = time.Since(stats.StartTime).Nanoseconds() vrLogStatsLogger.Send(stats) @@ -74,7 +72,6 @@ func init() { func vrlogStatsHandler(ch chan interface{}, w http.ResponseWriter, r *http.Request) { timeout, limit := parseTimeoutLimitParams(r) - tmr := time.NewTimer(timeout) defer tmr.Stop() for i := 0; i < limit; i++ { diff --git a/go/vt/vttablet/tabletmanager/vreplication/vrlog_test.go b/go/vt/vttablet/tabletmanager/vreplication/vrlog_test.go index 70bdf5563cf..76785ee72d7 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vrlog_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vrlog_test.go @@ -17,10 +17,8 @@ limitations under the License. package vreplication import ( - "context" "fmt" "net/http" - "net/http/httptest" "strconv" "strings" "testing" @@ -28,21 +26,30 @@ import ( ) func TestVrLog(t *testing.T) { - r, _ := http.NewRequest("GET", "/debug/vrlog?timeout=10&limit=1", nil) - w := httptest.NewRecorder() + r, _ := http.NewRequest("GET", "/debug/vrlog?timeout=100&limit=10", nil) + //w := httptest.NewRecorder() - ch := vrLogStatsLogger.Subscribe("vrlog") + w := NewHTTPStreamWriterMock() + + ch := vrLogStatsLogger.Subscribe("vrlogstats") defer vrLogStatsLogger.Unsubscribe(ch) go func() { vrlogStatsHandler(ch, w, r) }() - ctx := context.Background() eventType, detail := "Test", "detail 1" - stats := NewVrLogStats(ctx, eventType) - time.Sleep(1 * time.Millisecond) + stats := NewVrLogStats(eventType) stats.Send(detail) - time.Sleep(1 * time.Millisecond) - s := w.Body.String() + var s string + select { + case ret := <-w.ch: + b, ok := ret.([]byte) + if ok { + s = string(b) + } + case <-time.After(1 * time.Second): + s = "Timed out" + } + want := fmt.Sprintf("%s Event %s", eventType, detail) if !strings.Contains(s, want) { t.Fatalf(fmt.Sprintf("want %s, got %s", want, s)) @@ -60,14 +67,24 @@ func TestVrLog(t *testing.T) { if err != nil { t.Fatalf("Duration is not an integer: %s", err) } - if lastColValue < 1<<9 { - t.Fatalf("Waited 1 Millisecond, so duration should be greater than that: %d, %s", lastColValue, ss[len(ss)-1]) + if lastColValue == 0 { + t.Fatalf("Duration should not be zero") } - defer func() { - if err := recover(); err == nil { - t.Fatalf("Uninitialized stats should not log") - } - }() + stats = &VrLogStats{} - stats.Send("should error out since stats is not initalized") + stats.Send("detail123") + + select { + case ret := <-w.ch: + b, ok := ret.([]byte) + if ok { + s = string(b) + } + case <-time.After(1 * time.Second): + s = "Timed out" + } + prefix := "Error: Type not specified" + if !strings.HasPrefix(s, prefix) { + t.Fatalf("Incorrect Type for uninitialized stat, got %v", s) + } } From f174a555bd7979a89fb507779c536855eee509ef Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Mon, 6 Jan 2020 21:13:36 +0100 Subject: [PATCH 6/6] Addressed review comments Signed-off-by: Rohit Nayak --- .../tabletmanager/vreplication/framework_test.go | 2 +- .../tabletmanager/vreplication/http_stream_mock.go | 12 ++++++------ .../tabletmanager/vreplication/vplayer_test.go | 2 +- go/vt/vttablet/tabletmanager/vreplication/vrlog.go | 6 +++--- .../tabletmanager/vreplication/vrlog_test.go | 2 -- 5 files changed, 11 insertions(+), 13 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go index c51a5881545..4cbcc35ea5d 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go @@ -397,8 +397,8 @@ func expectDeleteQueries(t *testing.T) { } func expectLogsAndUnsubscribe(t *testing.T, logs []LogExpectation, logCh chan interface{}) { - defer vrLogStatsLogger.Unsubscribe(logCh) t.Helper() + defer vrLogStatsLogger.Unsubscribe(logCh) failed := false for i, log := range logs { if failed { diff --git a/go/vt/vttablet/tabletmanager/vreplication/http_stream_mock.go b/go/vt/vttablet/tabletmanager/vreplication/http_stream_mock.go index 8b640969e5d..ab9d9f3e220 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/http_stream_mock.go +++ b/go/vt/vttablet/tabletmanager/vreplication/http_stream_mock.go @@ -26,33 +26,33 @@ import ( "net/http" ) -//HTTPStreamWriterMock implements http.ResponseWriter and adds a channel to sync writes and reads +// HTTPStreamWriterMock implements http.ResponseWriter and adds a channel to sync writes and reads type HTTPStreamWriterMock struct { ch chan interface{} data []byte } -//NewHTTPStreamWriterMock returns a new HTTPStreamWriterMock +// NewHTTPStreamWriterMock returns a new HTTPStreamWriterMock func NewHTTPStreamWriterMock() *HTTPStreamWriterMock { return &HTTPStreamWriterMock{ch: make(chan interface{}, 1), data: make([]byte, 0)} } -//Header is a stub +// Header is a stub func (w *HTTPStreamWriterMock) Header() http.Header { return nil } -//WriteHeader is a stub +// WriteHeader is a stub func (w *HTTPStreamWriterMock) WriteHeader(statuscode int) { } -//Write buffers sent data +// Write buffers sent data func (w *HTTPStreamWriterMock) Write(data []byte) (int, error) { w.data = append(w.data, data...) return 0, nil } -//Flush sends buffered data to the channel +// Flush sends buffered data to the channel func (w *HTTPStreamWriterMock) Flush() { w.ch <- w.data w.data = w.data[:0] diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer_test.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer_test.go index 6bb77ecfa1c..dd95e70912a 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer_test.go @@ -182,7 +182,7 @@ func TestPlayerFilters(t *testing.T) { output []string table string data [][]string - logs []LogExpectation //logs defined only for a few testcases since they are enough to test all log events + logs []LogExpectation // logs are defined for a few testcases since they are enough to test all log events }{{ // insert with insertNormal input: "insert into src1 values(1, 'aaa')", diff --git a/go/vt/vttablet/tabletmanager/vreplication/vrlog.go b/go/vt/vttablet/tabletmanager/vreplication/vrlog.go index 5fb75d6f7d2..1f1bcac9599 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vrlog.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vrlog.go @@ -36,7 +36,7 @@ var ( Parse("{{.Type}} Event {{.Detail}} {{.LogTime}} {{.DurationNs}}\n")) ) -//VrLogStats collects attributes of a vreplication event for logging +// VrLogStats collects attributes of a vreplication event for logging type VrLogStats struct { Type string Detail string @@ -45,12 +45,12 @@ type VrLogStats struct { DurationNs int64 } -//NewVrLogStats should be called at the start of the event to be logged +// NewVrLogStats should be called at the start of the event to be logged func NewVrLogStats(eventType string) *VrLogStats { return &VrLogStats{Type: eventType, StartTime: time.Now()} } -//Send records the log event, should be called on a stats object constructed by NewVrLogStats() +// Send records the log event, should be called on a stats object constructed by NewVrLogStats() func (stats *VrLogStats) Send(detail string) { if stats.StartTime.IsZero() { stats.Type = "Error: Type not specified" diff --git a/go/vt/vttablet/tabletmanager/vreplication/vrlog_test.go b/go/vt/vttablet/tabletmanager/vreplication/vrlog_test.go index 76785ee72d7..3bd0dcf3217 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vrlog_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vrlog_test.go @@ -27,8 +27,6 @@ import ( func TestVrLog(t *testing.T) { r, _ := http.NewRequest("GET", "/debug/vrlog?timeout=100&limit=10", nil) - //w := httptest.NewRecorder() - w := NewHTTPStreamWriterMock() ch := vrLogStatsLogger.Subscribe("vrlogstats")