From f9292d2e30d7b14eac216b680589eec0cffdb986 Mon Sep 17 00:00:00 2001 From: ffffwh Date: Fri, 3 Sep 2021 18:10:23 +0800 Subject: [PATCH 1/7] applier: support splitted big-tx #779 --- drivers/mysql/common/binlog.go | 4 + drivers/mysql/common/type.schema | 6 +- drivers/mysql/common/type.schema.gen.go | 33 +++++- drivers/mysql/mysql/applier_incr.go | 135 ++++++++++++++---------- 4 files changed, 116 insertions(+), 62 deletions(-) diff --git a/drivers/mysql/common/binlog.go b/drivers/mysql/common/binlog.go index cd611f33c..27366128a 100644 --- a/drivers/mysql/common/binlog.go +++ b/drivers/mysql/common/binlog.go @@ -41,6 +41,10 @@ func (b *BinlogEntry) HasDDL() bool { return false } +func (b *BinlogEntry) IsPartOfBigTx() bool { + return !(b.Index == 0 && b.Final) +} + // Duplicate creates and returns a new binlog entry, with some of the attributes pre-assigned func (b *BinlogEntry) String() string { return fmt.Sprintf("[BinlogEntry at %+v]", b.Coordinates) diff --git a/drivers/mysql/common/type.schema b/drivers/mysql/common/type.schema index 93359a0e3..64c56c35b 100644 --- a/drivers/mysql/common/type.schema +++ b/drivers/mysql/common/type.schema @@ -65,8 +65,10 @@ struct DataEvent { } struct BinlogEntry { - Coordinates BinlogCoordinateTx - Events []DataEvent + Coordinates BinlogCoordinateTx + Events []DataEvent + Index int32 + Final bool } struct BinlogEntries { diff --git a/drivers/mysql/common/type.schema.gen.go b/drivers/mysql/common/type.schema.gen.go index d2e4d034d..12c7110f2 100644 --- a/drivers/mysql/common/type.schema.gen.go +++ b/drivers/mysql/common/type.schema.gen.go @@ -2529,6 +2529,8 @@ func (d *DataEvent) Unmarshal(buf []byte) (uint64, error) { type BinlogEntry struct { Coordinates BinlogCoordinateTx Events []DataEvent + Index int32 + Final bool } func (d *BinlogEntry) Size() (s uint64) { @@ -2559,6 +2561,7 @@ func (d *BinlogEntry) Size() (s uint64) { } } + s += 5 return } func (d *BinlogEntry) Marshal(buf []byte) ([]byte, error) { @@ -2607,7 +2610,25 @@ func (d *BinlogEntry) Marshal(buf []byte) ([]byte, error) { } } - return buf[:i+0], nil + { + + buf[i+0+0] = byte(d.Index >> 0) + + buf[i+1+0] = byte(d.Index >> 8) + + buf[i+2+0] = byte(d.Index >> 16) + + buf[i+3+0] = byte(d.Index >> 24) + + } + { + if d.Final { + buf[i+4] = 1 + } else { + buf[i+4] = 0 + } + } + return buf[:i+5], nil } func (d *BinlogEntry) Unmarshal(buf []byte) (uint64, error) { @@ -2654,7 +2675,15 @@ func (d *BinlogEntry) Unmarshal(buf []byte) (uint64, error) { } } - return i + 0, nil + { + + d.Index = 0 | (int32(buf[i+0+0]) << 0) | (int32(buf[i+1+0]) << 8) | (int32(buf[i+2+0]) << 16) | (int32(buf[i+3+0]) << 24) + + } + { + d.Final = buf[i+4] == 1 + } + return i + 5, nil } type BinlogEntries struct { diff --git a/drivers/mysql/mysql/applier_incr.go b/drivers/mysql/mysql/applier_incr.go index 6ecdfcb5a..32974c3bd 100644 --- a/drivers/mysql/mysql/applier_incr.go +++ b/drivers/mysql/mysql/applier_incr.go @@ -215,9 +215,12 @@ func (a *ApplierIncr) handleEntry(entryCtx *common.BinlogEntryContext) (err erro a.EntryCommittedHook(binlogEntry) // make gtid continuous return nil } - // region TestIfExecuted txSid := binlogEntry.Coordinates.GetSid() + // Note: the gtidExecuted will be updated after commit. For a big-tx, we determine + // whether to skip for each parts. + + // region TestIfExecuted gtidSetItem := a.gtidItemMap.GetItem(binlogEntry.Coordinates.SID) txExecuted := func() bool { a.gtidSetLock.RLock() @@ -259,60 +262,74 @@ func (a *ApplierIncr) handleEntry(entryCtx *common.BinlogEntryContext) (err erro return err } } else { - if rotated { - a.logger.Debug("binlog rotated", "file", a.replayingBinlogFile) - if !a.mtsManager.WaitForAllCommitted() { - return nil // TODO shutdown + if binlogEntry.Index == 0 { + if rotated { + a.logger.Debug("binlog rotated", "file", a.replayingBinlogFile) + if !a.mtsManager.WaitForAllCommitted() { + return nil // TODO shutdown + } + a.mtsManager.lastCommitted = 0 + a.mtsManager.lastEnqueue = 0 + a.wsManager.resetCommonParent(0) + nPending := len(a.mtsManager.m) + if nPending != 0 { + a.logger.Warn("DTLE_BUG: lcPendingTx should be 0", "nPending", nPending, + "file", a.replayingBinlogFile, "gno", binlogEntry.Coordinates.GNO) + } + } + + // If there are TXs skipped by udup source-side + if a.mtsManager.lastEnqueue+1 < binlogEntry.Coordinates.SeqenceNumber { + a.logger.Info("found skipping seq_num", + "lastEnqueue", a.mtsManager.lastEnqueue, "seqNum", binlogEntry.Coordinates.SeqenceNumber, + "uuid", txSid, "gno", binlogEntry.Coordinates.GNO) } - a.mtsManager.lastCommitted = 0 - a.mtsManager.lastEnqueue = 0 - a.wsManager.resetCommonParent(0) - nPending := len(a.mtsManager.m) - if nPending != 0 { - a.logger.Warn("DTLE_BUG: lcPendingTx should be 0", "nPending", nPending, - "file", a.replayingBinlogFile, "gno", binlogEntry.Coordinates.GNO) + for a.mtsManager.lastEnqueue+1 < binlogEntry.Coordinates.SeqenceNumber { + a.mtsManager.lastEnqueue += 1 + a.mtsManager.chExecuted <- a.mtsManager.lastEnqueue } - } - // If there are TXs skipped by udup source-side - if a.mtsManager.lastEnqueue + 1 < binlogEntry.Coordinates.SeqenceNumber { - a.logger.Info("found skipping seq_num", - "lastEnqueue", a.mtsManager.lastEnqueue, "seqNum", binlogEntry.Coordinates.SeqenceNumber, - "uuid", txSid, "gno", binlogEntry.Coordinates.GNO) - } - for a.mtsManager.lastEnqueue+1 < binlogEntry.Coordinates.SeqenceNumber { - a.mtsManager.lastEnqueue += 1 - a.mtsManager.chExecuted <- a.mtsManager.lastEnqueue - } + hasDDL := binlogEntry.HasDDL() + // DDL must be executed separatedly + if hasDDL || a.prevDDL { + a.logger.Debug("MTS found DDL. WaitForAllCommitted", + "gno", binlogEntry.Coordinates.GNO, "hasDDL", hasDDL, "prevDDL", a.prevDDL) + if !a.mtsManager.WaitForAllCommitted() { + return nil // shutdown + } + } + a.prevDDL = hasDDL - hasDDL := binlogEntry.HasDDL() - // DDL must be executed separatedly - if hasDDL || a.prevDDL { - a.logger.Debug("MTS found DDL. WaitForAllCommitted", - "gno", binlogEntry.Coordinates.GNO, "hasDDL", hasDDL, "prevDDL", a.prevDDL) - if !a.mtsManager.WaitForAllCommitted() { - return nil // shutdown + if binlogEntry.IsPartOfBigTx() { + if !a.mtsManager.WaitForAllCommitted() { + return nil // shutdown + } + a.wsManager.resetCommonParent(binlogEntry.Coordinates.SeqenceNumber) + } else if !a.mysqlContext.UseMySQLDependency { + newLC := a.wsManager.GatLastCommit(entryCtx) + binlogEntry.Coordinates.LastCommitted = newLC + a.logger.Debug("WritesetManager", "lc", newLC, "seq", binlogEntry.Coordinates.SeqenceNumber, + "gno", binlogEntry.Coordinates.GNO) } } - a.prevDDL = hasDDL err = a.setTableItemForBinlogEntry(entryCtx) if err != nil { return err } - if !a.mysqlContext.UseMySQLDependency { - newLC := a.wsManager.GatLastCommit(entryCtx) - binlogEntry.Coordinates.LastCommitted = newLC - a.logger.Debug("WritesetManager", "lc", newLC, "seq", binlogEntry.Coordinates.SeqenceNumber, - "gno", binlogEntry.Coordinates.GNO) - } - - if !a.mtsManager.WaitForExecution(binlogEntry) { - return nil // shutdown + if binlogEntry.IsPartOfBigTx() { + err = a.ApplyBinlogEvent(0, entryCtx) + if err != nil { + return err + } + } else { + if !a.mtsManager.WaitForExecution(binlogEntry) { + return nil // shutdown + } + a.logger.Debug("a binlogEntry MTS enqueue.", "gno", binlogEntry.Coordinates.GNO) + a.applyBinlogMtsTxQueue <- entryCtx } - a.logger.Debug("a binlogEntry MTS enqueue.", "gno", binlogEntry.Coordinates.GNO) - a.applyBinlogMtsTxQueue <- entryCtx } return nil } @@ -561,26 +578,28 @@ func (a *ApplierIncr) ApplyBinlogEvent(workerIdx int, binlogEntryCtx *common.Bin timestamp = event.Timestamp } - if !a.SkipGtidExecutedTable { - logger.Debug("insert gno", "gno", binlogEntry.Coordinates.GNO) - _, err = dbApplier.PsInsertExecutedGtid.ExecContext(a.ctx, + if binlogEntry.Final { + if !a.SkipGtidExecutedTable { + logger.Debug("insert gno", "gno", binlogEntry.Coordinates.GNO) + _, err = dbApplier.PsInsertExecutedGtid.ExecContext(a.ctx, a.subject, uuid.UUID(binlogEntry.Coordinates.SID).Bytes(), binlogEntry.Coordinates.GNO) - if err != nil { - return errors.Wrap(err, "insert gno") + if err != nil { + return errors.Wrap(err, "insert gno") + } } - } - if err := dbApplier.Tx.Commit(); err != nil { - a.OnError(common.TaskStateDead, err) - } else { - a.mtsManager.Executed(binlogEntry) - a.EntryCommittedHook(binlogEntry) - } - dbApplier.Tx = nil - if a.printTps { - atomic.AddUint32(&a.txLastNSeconds, 1) + if err := dbApplier.Tx.Commit(); err != nil { + a.OnError(common.TaskStateDead, err) + } else { + a.mtsManager.Executed(binlogEntry) + a.EntryCommittedHook(binlogEntry) + } + dbApplier.Tx = nil + if a.printTps { + atomic.AddUint32(&a.txLastNSeconds, 1) + } + atomic.AddUint32(&a.appliedTxCount, 1) } - atomic.AddUint32(&a.appliedTxCount, 1) // no error a.mysqlContext.Stage = common.StageWaitingForGtidToBeCommitted From f164dd037f6ae5b1bbf1c754d9cee357e9e1bfbd Mon Sep 17 00:00:00 2001 From: ffffwh Date: Mon, 6 Sep 2021 13:35:45 +0800 Subject: [PATCH 2/7] binlog_reader: refactory --- drivers/mysql/mysql/binlog/binlog_reader.go | 47 +++++++++++---------- 1 file changed, 24 insertions(+), 23 deletions(-) diff --git a/drivers/mysql/mysql/binlog/binlog_reader.go b/drivers/mysql/mysql/binlog/binlog_reader.go index e47220ddc..65725e98c 100644 --- a/drivers/mysql/mysql/binlog/binlog_reader.go +++ b/drivers/mysql/mysql/binlog/binlog_reader.go @@ -78,7 +78,6 @@ type BinlogReader struct { // dynamic config, include all tables (implicitly assigned or dynamically created) tables map[string](map[string]*common.TableContext) - currentBinlogEntry *common.BinlogEntry hasBeginQuery bool entryContext *common.BinlogEntryContext ReMap map[string]*regexp.Regexp // This is a cache for regexp. @@ -460,21 +459,23 @@ func (b *BinlogReader) handleEvent(ev *replication.BinlogEvent, entriesChannel c u, _ := uuid.FromBytes(evt.SID) entry := common.NewBinlogEntry() - entry.Coordinates.SID = u - entry.Coordinates.GNO = evt.GNO - entry.Coordinates.LastCommitted = evt.LastCommitted - entry.Coordinates.SeqenceNumber = evt.SequenceNumber - entry.Coordinates.LogFile = b.currentCoord.LogFile + entry.Coordinates = common.BinlogCoordinateTx{ + LogFile: b.currentCoord.LogFile, + LogPos: int64(ev.Header.LogPos), + SID: u, + GNO: evt.GNO, + LastCommitted: evt.LastCommitted, + SeqenceNumber: evt.SequenceNumber, + } - b.currentBinlogEntry = entry b.hasBeginQuery = false b.entryContext = &common.BinlogEntryContext{ - Entry: b.currentBinlogEntry, + Entry: entry, TableItems: nil, OriginalSize: 1, // GroupMaxSize is default to 1 and we send on EntriesSize >= GroupMaxSize } case replication.QUERY_EVENT: - gno := b.currentBinlogEntry.Coordinates.GNO + gno := b.entryContext.Entry.Coordinates.GNO evt := ev.Event.(*replication.QueryEvent) query := string(evt.Query) @@ -532,7 +533,7 @@ func (b *BinlogReader) handleEvent(ev *replication.BinlogEvent, entriesChannel c ev.Header.Timestamp, ) - b.currentBinlogEntry.Events = append(b.currentBinlogEntry.Events, event) + b.entryContext.Entry.Events = append(b.entryContext.Entry.Events, event) b.entryContext.OriginalSize += len(ev.RawData) } b.sendEntry(entriesChannel) @@ -655,7 +656,7 @@ func (b *BinlogReader) handleEvent(ev *replication.BinlogEvent, entriesChannel c } event.Table = tableBs } - b.currentBinlogEntry.Events = append(b.currentBinlogEntry.Events, event) + b.entryContext.Entry.Events = append(b.entryContext.Entry.Events, event) } } } @@ -678,7 +679,7 @@ func (b *BinlogReader) handleEvent(ev *replication.BinlogEvent, entriesChannel c } // TODO is the pos the start or the end of a event? // pos if which event should be use? Do we need +1? - b.currentBinlogEntry.Coordinates.LogPos = b.currentCoord.LogPos + b.entryContext.Entry.Coordinates.LogPos = b.currentCoord.LogPos b.sendEntry(entriesChannel) if meetTarget { @@ -689,13 +690,13 @@ func (b *BinlogReader) handleEvent(ev *replication.BinlogEvent, entriesChannel c schemaName := string(rowsEvent.Table.Schema) tableName := string(rowsEvent.Table.Table) b.logger.Debug("got rowsEvent", "schema", schemaName, "table", tableName, - "gno", b.currentBinlogEntry.Coordinates.GNO) + "gno", b.entryContext.Entry.Coordinates.GNO) dml := common.ToEventDML(ev.Header.EventType) skip, table := b.skipRowEvent(rowsEvent, dml) if skip { b.logger.Debug("skip rowsEvent", "schema", schemaName, "table", tableName, - "gno", b.currentBinlogEntry.Coordinates.GNO) + "gno", b.entryContext.Entry.Coordinates.GNO) return nil } @@ -849,7 +850,7 @@ func (b *BinlogReader) handleEvent(ev *replication.BinlogEvent, entriesChannel c dmlEvent.WhereColumnValues.AbstractValues = newRow } } - b.currentBinlogEntry.Events = append(b.currentBinlogEntry.Events, dmlEvent) + b.entryContext.Entry.Events = append(b.entryContext.Entry.Events, dmlEvent) } else { b.logger.Debug("event has not passed 'where'") } @@ -887,15 +888,15 @@ func (b *BinlogReader) checkDtleQueryOSID(query string) error { return fmt.Errorf("bad dtle_gtid splitted for query %v", query) } - b.logger.Debug("query osid", "osid", ss[1], "gno", b.currentBinlogEntry.Coordinates.GNO) + b.logger.Debug("query osid", "osid", ss[1], "gno", b.entryContext.Entry.Coordinates.GNO) - b.currentBinlogEntry.Coordinates.OSID = ss[1] + b.entryContext.Entry.Coordinates.OSID = ss[1] return nil } func (b *BinlogReader) setDtleQuery(query string) string { - if b.currentBinlogEntry.Coordinates.OSID == "" { - uuidStr := uuid.UUID(b.currentBinlogEntry.Coordinates.SID).String() - tag := fmt.Sprintf("/*dtle_gtid1 %v %v %v dtle_gtid*/", b.execCtx.Subject, uuidStr, b.currentBinlogEntry.Coordinates.GNO) + if b.entryContext.Entry.Coordinates.OSID == "" { + uuidStr := uuid.UUID(b.entryContext.Entry.Coordinates.SID).String() + tag := fmt.Sprintf("/*dtle_gtid1 %v %v %v dtle_gtid*/", b.execCtx.Subject, uuidStr, b.entryContext.Entry.Coordinates.GNO) upperQuery := strings.ToUpper(query) if strings.HasPrefix(upperQuery, "CREATE DEFINER=") { @@ -911,7 +912,7 @@ func (b *BinlogReader) setDtleQuery(query string) string { } func (b *BinlogReader) sendEntry(entriesChannel chan<- *common.BinlogEntryContext) { - b.logger.Debug("sendEntry", "gno", b.currentBinlogEntry.Coordinates.GNO, "events", len(b.currentBinlogEntry.Events)) + b.logger.Debug("sendEntry", "gno", b.entryContext.Entry.Coordinates.GNO, "events", len(b.entryContext.Entry.Events)) atomic.AddInt64(b.memory, int64(b.entryContext.Entry.Size())) entriesChannel <- b.entryContext atomic.AddUint32(&b.extractedTxCount, 1) @@ -1309,8 +1310,8 @@ func (b *BinlogReader) skipRowEvent(rowsEvent *replication.RowsEvent, dml int8) if err != nil { b.logger.Error("cycle-prevention: cannot convert sid to uuid", "err", err, "sid", sidByte) } else { - b.currentBinlogEntry.Coordinates.OSID = sid.String() - b.logger.Debug("found an osid", "osid", b.currentBinlogEntry.Coordinates.OSID) + b.entryContext.Entry.Coordinates.OSID = sid.String() + b.logger.Debug("found an osid", "osid", b.entryContext.Entry.Coordinates.OSID) } } } From 36dcba3071ecefd19b56fd1c4e4cf79dca8b61cb Mon Sep 17 00:00:00 2001 From: ffffwh Date: Sun, 12 Sep 2021 23:16:24 +0800 Subject: [PATCH 3/7] add config for memory --- cmd/nomad-plugin/main.go | 9 +++++++++ drivers/mysql/driver.go | 2 ++ g/g.go | 1 + 3 files changed, 12 insertions(+) diff --git a/cmd/nomad-plugin/main.go b/cmd/nomad-plugin/main.go index 125e89322..7c9220d91 100644 --- a/cmd/nomad-plugin/main.go +++ b/cmd/nomad-plugin/main.go @@ -3,6 +3,7 @@ package main import ( "flag" "fmt" + "github.com/shirou/gopsutil/v3/mem" _ "net/http/pprof" "os" "runtime" @@ -29,6 +30,14 @@ func main() { plugins.Serve(func(logger hclog.Logger) interface{} { g.Logger = logger + vmStat, err := mem.VirtualMemory() + if err != nil { + logger.Warn("cannot get available memory. assuming 4096MB") + g.MemAvailable = 4096 * 1024 * 1024 + } + logger.Info("available memory in MB", "size", vmStat.Available / 1024 / 1024) + g.MemAvailable = vmStat.Available + logger.Info("dtle starting", "version", versionStr, "pid", pid) logger.Info("env", "GODEBUG", os.Getenv("GODEBUG"), "GOMAXPROCS", runtime.GOMAXPROCS(0)) logger.Debug("plugins.Serve Factory called.") diff --git a/drivers/mysql/driver.go b/drivers/mysql/driver.go index 8e2be4bc1..96af80b6d 100644 --- a/drivers/mysql/driver.go +++ b/drivers/mysql/driver.go @@ -76,6 +76,7 @@ var ( hclspec.NewLiteral(`""`)), "key_file_path": hclspec.NewDefault(hclspec.NewAttr("key_file_path", "string", false), hclspec.NewLiteral(`""`)), + "memory": hclspec.NewAttr("memory", "string", false), }) // taskConfigSpec is the hcl specification for the driver config section of @@ -274,6 +275,7 @@ type DriverConfig struct { RsaPrivateKeyPath string `codec:"rsa_private_key_path"` CertFilePath string `codec:"cert_file_path"` KeyFilePath string `codec:"key_file_path"` + Memory string `codec:"memory"` } func (d *Driver) SetConfig(c *base.Config) (err error) { diff --git a/g/g.go b/g/g.go index 6d8a56ce9..6987925ac 100644 --- a/g/g.go +++ b/g/g.go @@ -78,6 +78,7 @@ var ( freeMemoryWorkerCount = int32(0) lowMemory = int32(0) memoryMonitorCount = int32(0) + MemAvailable = uint64(0) ) func FreeMemoryWorker() { From 64bfdc1fe1e7c02fbefcc30a553325ba598a06a1 Mon Sep 17 00:00:00 2001 From: ffffwh Date: Sun, 12 Sep 2021 23:18:06 +0800 Subject: [PATCH 4/7] update vendor go-mysql memory limit --- go.mod | 3 +- go.sum | 2 + .../go-mysql/replication/binlogstreamer.go | 5 -- .../go-mysql/replication/binlogsyncer.go | 55 +++++++++++++++---- .../siddontang/go-mysql/replication/event.go | 2 - .../siddontang/go-mysql/replication/parser.go | 2 +- vendor/modules.txt | 2 +- 7 files changed, 49 insertions(+), 22 deletions(-) diff --git a/go.mod b/go.mod index 01674be75..3b27bbe53 100644 --- a/go.mod +++ b/go.mod @@ -53,6 +53,7 @@ require ( github.com/remyoudompheng/bigfft v0.0.0-20190728182440-6a916e37a237 // indirect github.com/satori/go.uuid v1.2.0 github.com/shirou/gopsutil v2.20.2+incompatible + github.com/shirou/gopsutil/v3 v3.21.6-0.20210619153009-7ea8062810b6 github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726 github.com/siddontang/go-mysql v0.0.0-20200311002057-7a62847fcdb5 github.com/stretchr/testify v1.6.1 @@ -64,7 +65,7 @@ require ( google.golang.org/grpc v1.28.0 // indirect ) -replace github.com/siddontang/go-mysql => github.com/ffffwh/go-mysql v0.0.0-20201125093656-67dc9957b4da +replace github.com/siddontang/go-mysql => github.com/ffffwh/go-mysql v0.0.0-20210912151044-a63debbbb0a3 //replace github.com/Sirupsen/logrus => github.com/sirupsen/logrus v1.4.2 diff --git a/go.sum b/go.sum index e1d39a916..1790b2796 100644 --- a/go.sum +++ b/go.sum @@ -313,6 +313,8 @@ github.com/ffffwh/go-mysql v0.0.0-20201009082833-67dc9957b4da/go.mod h1:+W4RCzes github.com/ffffwh/go-mysql v0.0.0-20201009082834-5fab8e2e5b54/go.mod h1:Bl4lryU44qtIXEXNbP0k0pD646Nkw/qHn21wfZVGJx4= github.com/ffffwh/go-mysql v0.0.0-20201125093656-67dc9957b4da h1:xRPnR5h12OGa4GzlPn8atuCpiihGv1emZN+PrR9kbzM= github.com/ffffwh/go-mysql v0.0.0-20201125093656-67dc9957b4da/go.mod h1:+W4RCzesQDI11HvIkaDjS8yM36SpAnGNQ7jmTLn5BnU= +github.com/ffffwh/go-mysql v0.0.0-20210912151044-a63debbbb0a3 h1:DF3016zo73aJ/ONoANThbPAI4ZQRZVR1IW7ZvGEgcKY= +github.com/ffffwh/go-mysql v0.0.0-20210912151044-a63debbbb0a3/go.mod h1:+W4RCzesQDI11HvIkaDjS8yM36SpAnGNQ7jmTLn5BnU= github.com/ffffwh/qlbridge v0.0.0-20181026023605-fc2d5205 h1:FmRxc/ePtu+IAPDxRhHIOwzAbBRwIIcJHiC9ra6fraI= github.com/ffffwh/qlbridge v0.0.0-20181026023605-fc2d5205/go.mod h1:OcgDZHkJGBiiCpy241ZbouFjd+7oU9qUf7ZXbwBQMWE= github.com/ffffwh/qlbridge v0.0.0-20181026023605-fc2d5205dad3 h1:aQDDUNvnvhWw7pD8Bl33tBUAp5uDsFSPPMXijWr5fI0= diff --git a/vendor/github.com/siddontang/go-mysql/replication/binlogstreamer.go b/vendor/github.com/siddontang/go-mysql/replication/binlogstreamer.go index d53741bfb..62f840369 100644 --- a/vendor/github.com/siddontang/go-mysql/replication/binlogstreamer.go +++ b/vendor/github.com/siddontang/go-mysql/replication/binlogstreamer.go @@ -3,7 +3,6 @@ package replication import ( "context" "github.com/pingcap/errors" - "github.com/opentracing/opentracing-go" "github.com/siddontang/go-log/log" "sync/atomic" "time" @@ -31,10 +30,6 @@ func (s *BinlogStreamer) GetEvent(ctx context.Context) (*BinlogEvent, error) { select { case c := <-s.ch: - span := opentracing.StartSpan("send binlogEvent from go-mysql", opentracing.FollowsFrom(c.SpanContest)) - span.SetTag("send event from go mysql time ", time.Now().Unix()) - c.SpanContest = span.Context() - span.Finish() atomic.AddInt64(&s.mem, -int64(len(c.RawData))) return c, nil case s.err = <-s.ech: diff --git a/vendor/github.com/siddontang/go-mysql/replication/binlogsyncer.go b/vendor/github.com/siddontang/go-mysql/replication/binlogsyncer.go index 2926d37b6..ef9d75269 100644 --- a/vendor/github.com/siddontang/go-mysql/replication/binlogsyncer.go +++ b/vendor/github.com/siddontang/go-mysql/replication/binlogsyncer.go @@ -12,7 +12,6 @@ import ( "time" "github.com/pingcap/errors" - "github.com/opentracing/opentracing-go" uuid "github.com/satori/go.uuid" "github.com/siddontang/go-log/log" "github.com/siddontang/go-mysql/client" @@ -106,6 +105,13 @@ type BinlogSyncerConfig struct { // https://mariadb.com/kb/en/library/com_binlog_dump/ // https://mariadb.com/kb/en/library/annotate_rows_event/ DumpCommandFlag uint16 + + // When streamer.QueueMem() is reaching the size, pause handling events until QueueMem() + // has decreased. 0 for no limit. + MemLimitSize int64 + // When having paused for MemLimitSeconds, force to handle a event to prevent MySQL + // net_write_timeout. + MemLimitSeconds int } // BinlogSyncer syncs binlog event from server. @@ -647,10 +653,7 @@ func (b *BinlogSyncer) onStream(s *BinlogStreamer) { }() for { - span := opentracing.StartSpan("data source: get incremental data from ReadPacket()") - span.SetTag("before get incremental data time:", time.Now().Unix()) data, err := b.c.ReadPacket() - span.SetTag("after get incremental data time:", time.Now().Unix()) select { case <-b.ctx.Done(): s.close() @@ -709,7 +712,7 @@ func (b *BinlogSyncer) onStream(s *BinlogStreamer) { switch data[0] { case OK_HEADER: - if err = b.parseEvent(span.Context(), s, data); err != nil { + if err = b.parseEvent(s, data); err != nil { s.closeWithError(err) return } @@ -728,16 +731,12 @@ func (b *BinlogSyncer) onStream(s *BinlogStreamer) { log.Errorf("invalid stream header %c", data[0]) continue } - span.Finish() } } -func (b *BinlogSyncer) parseEvent(spanContext opentracing.SpanContext, s *BinlogStreamer, data []byte) error { +func (b *BinlogSyncer) parseEvent(s *BinlogStreamer, data []byte) error { //skip OK byte, 0x00 data = data[1:] - span := opentracing.GlobalTracer().StartSpan(" incremental data are conversion to BinlogEvent", opentracing.ChildOf(spanContext)) - span.SetTag("time", time.Now().Unix()) - defer span.Finish() needACK := false if b.cfg.SemiSyncEnabled && (data[0] == SemiSyncIndicator) { needACK = (data[1] == 0x01) @@ -746,8 +745,6 @@ func (b *BinlogSyncer) parseEvent(spanContext opentracing.SpanContext, s *Binlog } e, err := b.parser.Parse(data) - e.SpanContest = span.Context() - span.SetTag("tx timestap", e.Header.Timestamp) if err != nil { return errors.Trace(err) } @@ -808,6 +805,40 @@ func (b *BinlogSyncer) parseEvent(spanContext opentracing.SpanContext, s *Binlog event.GSet = getCurrentGtidSet() } + reachingMemLimit := func() bool { + if b.cfg.MemLimitSize <= 0 { + return false + } + if s.QueueSize() == 0 { + return false + } + if s.QueueMem() + int64(len(e.RawData)) > b.cfg.MemLimitSize { + return true + } else { + return false + } + } + for i := 0; ; i++ { + if !b.running { + break + } + if !reachingMemLimit() { + if i > 0 { + log.Infof("reachingMemLimit. continue. %v %v", s.QueueMem(), i) + } + break + } + if i >= 2 * b.cfg.MemLimitSeconds { + log.Infof("reachingMemLimit. force continue. %v", s.QueueMem()) + // To prevent MySQL net_write_timeout + break + } + if i == 0 { + log.Infof("reachingMemLimit. sleep. %v %v", s.QueueMem(), i) + } + time.Sleep(500 * time.Millisecond) + } + needStop := false select { case s.ch <- e: diff --git a/vendor/github.com/siddontang/go-mysql/replication/event.go b/vendor/github.com/siddontang/go-mysql/replication/event.go index 43897fa3e..947e2e5a9 100644 --- a/vendor/github.com/siddontang/go-mysql/replication/event.go +++ b/vendor/github.com/siddontang/go-mysql/replication/event.go @@ -11,7 +11,6 @@ import ( "unicode" "github.com/pingcap/errors" - "github.com/opentracing/opentracing-go" "github.com/satori/go.uuid" . "github.com/siddontang/go-mysql/mysql" ) @@ -31,7 +30,6 @@ type BinlogEvent struct { Header *EventHeader Event Event - SpanContest opentracing.SpanContext } func (e *BinlogEvent) Dump(w io.Writer) { diff --git a/vendor/github.com/siddontang/go-mysql/replication/parser.go b/vendor/github.com/siddontang/go-mysql/replication/parser.go index 4d220c030..a0cc46cc7 100644 --- a/vendor/github.com/siddontang/go-mysql/replication/parser.go +++ b/vendor/github.com/siddontang/go-mysql/replication/parser.go @@ -335,7 +335,7 @@ func (p *BinlogParser) Parse(data []byte) (*BinlogEvent, error) { return nil, err } - return &BinlogEvent{RawData: rawData, Header: h, Event: e, SpanContest: nil}, nil + return &BinlogEvent{RawData: rawData, Header: h, Event: e}, nil } func (p *BinlogParser) verifyCrc32Checksum(rawData []byte) error { diff --git a/vendor/modules.txt b/vendor/modules.txt index de4790017..463e3364c 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -497,7 +497,7 @@ github.com/siddontang/go/sync2 # github.com/siddontang/go-log v0.0.0-20180807004314-8d05993dda07 github.com/siddontang/go-log/log github.com/siddontang/go-log/loggers -# github.com/siddontang/go-mysql v0.0.0-20200311002057-7a62847fcdb5 => github.com/ffffwh/go-mysql v0.0.0-20201125093656-67dc9957b4da +# github.com/siddontang/go-mysql v0.0.0-20200311002057-7a62847fcdb5 => github.com/ffffwh/go-mysql v0.0.0-20210912151044-a63debbbb0a3 github.com/siddontang/go-mysql/client github.com/siddontang/go-mysql/mysql github.com/siddontang/go-mysql/packet From 2bcab53f6a98f67b4409e4106f2c9b2c85e15ddc Mon Sep 17 00:00:00 2001 From: ffffwh Date: Tue, 7 Sep 2021 15:59:07 +0800 Subject: [PATCH 5/7] binlog_reader: split big tx --- drivers/mysql/mysql/applier.go | 4 ++++ drivers/mysql/mysql/binlog/binlog_reader.go | 21 +++++++++++++++++++++ 2 files changed, 25 insertions(+) diff --git a/drivers/mysql/mysql/applier.go b/drivers/mysql/mysql/applier.go index 4a83f11d5..261872c20 100644 --- a/drivers/mysql/mysql/applier.go +++ b/drivers/mysql/mysql/applier.go @@ -563,6 +563,10 @@ func (a *Applier) subscribeNats() (err error) { a.logger.Debug("incr. after publish nats reply.") } else { bs := incrNMM.GetBytes() + for g.IsLowMemory() { + a.logger.Debug("incr. low mem. waiting") + time.Sleep(900 * time.Millisecond) + } select { case <-a.shutdownCh: return diff --git a/drivers/mysql/mysql/binlog/binlog_reader.go b/drivers/mysql/mysql/binlog/binlog_reader.go index 65725e98c..20871408f 100644 --- a/drivers/mysql/mysql/binlog/binlog_reader.go +++ b/drivers/mysql/mysql/binlog/binlog_reader.go @@ -243,6 +243,8 @@ func NewBinlogReader(execCtx *common.ExecContext, cfg *common.MySQLDriverConfig, ReadTimeout: 12 * time.Second, ParseTime: false, // must be false, or gencode will complain. + + MemLimitSize: int64(g.MemAvailable * 10 / 2), } binlogReader.binlogSyncer = replication.NewBinlogSyncer(binlogSyncerConfig) } @@ -467,6 +469,8 @@ func (b *BinlogReader) handleEvent(ev *replication.BinlogEvent, entriesChannel c LastCommitted: evt.LastCommitted, SeqenceNumber: evt.SequenceNumber, } + entry.Index = 0 + entry.Final = true b.hasBeginQuery = false b.entryContext = &common.BinlogEntryContext{ @@ -854,6 +858,21 @@ func (b *BinlogReader) handleEvent(ev *replication.BinlogEvent, entriesChannel c } else { b.logger.Debug("event has not passed 'where'") } + + if b.entryContext.OriginalSize >= bigTxSplittingSize { + b.logger.Debug("splitting big tx", "index", b.entryContext.Entry.Index) + b.entryContext.Entry.Final = false + b.sendEntry(entriesChannel) + entry := common.NewBinlogEntry() + entry.Coordinates = b.entryContext.Entry.Coordinates + entry.Index = b.entryContext.Entry.Index + 1 + entry.Final = true + b.entryContext = &common.BinlogEntryContext{ + Entry: entry, + TableItems: nil, + OriginalSize: 1, + } + } } return nil } @@ -864,6 +883,8 @@ func (b *BinlogReader) handleEvent(ev *replication.BinlogEvent, entriesChannel c const ( dtleQueryPrefix = "/*dtle_gtid1 " dtleQuerySuffix = " dtle_gtid*/" + + bigTxSplittingSize = 64 * 1024 * 1024 ) func (b *BinlogReader) checkDtleQueryOSID(query string) error { From 027c437826a551d74a3e396fcee8a1d0cde5331d Mon Sep 17 00:00:00 2001 From: ffffwh Date: Sun, 12 Sep 2021 22:34:59 +0800 Subject: [PATCH 6/7] use instant value for IsLowMemory instead of updating the value onece a second. --- g/g.go | 42 +++++++++++++++++++++++------------------- 1 file changed, 23 insertions(+), 19 deletions(-) diff --git a/g/g.go b/g/g.go index 6987925ac..b42e1aca9 100644 --- a/g/g.go +++ b/g/g.go @@ -76,7 +76,7 @@ func StringPtrEmpty(p *string) bool { var ( freeMemoryCh = make(chan struct{}) freeMemoryWorkerCount = int32(0) - lowMemory = int32(0) + lowMemory = false memoryMonitorCount = int32(0) MemAvailable = uint64(0) ) @@ -103,7 +103,25 @@ func TriggerFreeMemory() { func IsLowMemory() bool { //return atomic.LoadInt32(&lowMemory) == 1 - return lowMemory == 1 + memory, err := mem.VirtualMemory() + if err != nil { + return false + } + if ((memory.Available * 10) < (memory.Total * 2)) && (memory.Available < 1*1024*1024*1024) { + if !lowMemory { + Logger.Warn("memory is less than 20% and 1GB. pause parsing binlog", + "available", memory.Available, "total", memory.Total) + } + lowMemory = true + return true + } else { + if lowMemory { + Logger.Info("memory is greater than 20% or 1GB. continue parsing binlog", + "available", memory.Available, "total", memory.Total) + } + lowMemory = false + return false + } } func MemoryMonitor(logger LoggerType) { @@ -115,24 +133,10 @@ func MemoryMonitor(logger LoggerType) { for { <-t.C - memory, err := mem.VirtualMemory() - if err != nil { - lowMemory = 0 + if IsLowMemory() { + TriggerFreeMemory() } else { - if (float64(memory.Available)/float64(memory.Total) < 0.2) && (memory.Available < 1*1024*1024*1024) { - if lowMemory == 0 { - logger.Warn("memory is less than 20% and 1GB. pause parsing binlog", - "available", memory.Available, "total", memory.Total) - } - lowMemory = 1 - TriggerFreeMemory() - } else { - if lowMemory == 1 { - logger.Info("memory is greater than 20% or 1GB. continue parsing binlog", - "available", memory.Available, "total", memory.Total) - } - lowMemory = 0 - } + } } } From 4b865352145167e4bfa784b4142c67dfb5731d46 Mon Sep 17 00:00:00 2001 From: ffffwh Date: Fri, 17 Sep 2021 17:26:15 +0800 Subject: [PATCH 7/7] require dest ack for big tx --- drivers/mysql/common/type.schema | 5 ++ drivers/mysql/common/type.schema.gen.go | 70 +++++++++++++++++++++ drivers/mysql/mysql/applier.go | 24 +++++-- drivers/mysql/mysql/applier_incr.go | 11 ++-- drivers/mysql/mysql/binlog/binlog_reader.go | 9 +++ drivers/mysql/mysql/extractor.go | 12 ++++ 6 files changed, 120 insertions(+), 11 deletions(-) diff --git a/drivers/mysql/common/type.schema b/drivers/mysql/common/type.schema index 64c56c35b..12b3dcaac 100644 --- a/drivers/mysql/common/type.schema +++ b/drivers/mysql/common/type.schema @@ -79,3 +79,8 @@ struct ControlMsg { Type int32 Msg string } + +struct BigTxAck { + GNO int64 + Index int32 +} diff --git a/drivers/mysql/common/type.schema.gen.go b/drivers/mysql/common/type.schema.gen.go index 12c7110f2..de19b2231 100644 --- a/drivers/mysql/common/type.schema.gen.go +++ b/drivers/mysql/common/type.schema.gen.go @@ -2930,3 +2930,73 @@ func (d *ControlMsg) Unmarshal(buf []byte) (uint64, error) { } return i + 4, nil } + +type BigTxAck struct { + GNO int64 + Index int32 +} + +func (d *BigTxAck) Size() (s uint64) { + + s += 12 + return +} +func (d *BigTxAck) Marshal(buf []byte) ([]byte, error) { + size := d.Size() + { + if uint64(cap(buf)) >= size { + buf = buf[:size] + } else { + buf = make([]byte, size) + } + } + i := uint64(0) + + { + + buf[0+0] = byte(d.GNO >> 0) + + buf[1+0] = byte(d.GNO >> 8) + + buf[2+0] = byte(d.GNO >> 16) + + buf[3+0] = byte(d.GNO >> 24) + + buf[4+0] = byte(d.GNO >> 32) + + buf[5+0] = byte(d.GNO >> 40) + + buf[6+0] = byte(d.GNO >> 48) + + buf[7+0] = byte(d.GNO >> 56) + + } + { + + buf[0+8] = byte(d.Index >> 0) + + buf[1+8] = byte(d.Index >> 8) + + buf[2+8] = byte(d.Index >> 16) + + buf[3+8] = byte(d.Index >> 24) + + } + return buf[:i+12], nil +} + +func (d *BigTxAck) Unmarshal(buf []byte) (uint64, error) { + i := uint64(0) + + { + + d.GNO = 0 | (int64(buf[0+0]) << 0) | (int64(buf[1+0]) << 8) | (int64(buf[2+0]) << 16) | (int64(buf[3+0]) << 24) | (int64(buf[4+0]) << 32) | (int64(buf[5+0]) << 40) | (int64(buf[6+0]) << 48) | (int64(buf[7+0]) << 56) + + } + { + + d.Index = 0 | (int32(buf[0+8]) << 0) | (int32(buf[1+8]) << 8) | (int32(buf[2+8]) << 16) | (int32(buf[3+8]) << 24) + + } + return i + 12, nil +} diff --git a/drivers/mysql/mysql/applier.go b/drivers/mysql/mysql/applier.go index 261872c20..8930827c0 100644 --- a/drivers/mysql/mysql/applier.go +++ b/drivers/mysql/mysql/applier.go @@ -303,8 +303,24 @@ func (a *Applier) Run() { a.onError(common.TaskStateDead, errors.Wrap(err, "NewApplierIncr")) return } - a.ai.EntryCommittedHook = func(entry *common.BinlogEntry) { - a.gtidCh <- &entry.Coordinates + a.ai.EntryExecutedHook = func(entry *common.BinlogEntry) { + if entry.Final { + a.gtidCh <- &entry.Coordinates + } + if entry.IsPartOfBigTx() { + bs, err := (&common.BigTxAck{ + GNO: entry.Coordinates.GNO, + Index: entry.Index, + }).Marshal(nil) + if err != nil { + a.onError(common.TaskStateDead, errors.Wrap(err, "bigtx_ack. Marshal")) + } + _, err = a.natsConn.Request(fmt.Sprintf("%s_bigtx_ack", a.subject), + bs, 1 * time.Minute) + if err != nil { + a.onError(common.TaskStateDead, errors.Wrap(err, "bigtx_ack. Request")) + } + } } a.ai.OnError = a.onError @@ -563,10 +579,6 @@ func (a *Applier) subscribeNats() (err error) { a.logger.Debug("incr. after publish nats reply.") } else { bs := incrNMM.GetBytes() - for g.IsLowMemory() { - a.logger.Debug("incr. low mem. waiting") - time.Sleep(900 * time.Millisecond) - } select { case <-a.shutdownCh: return diff --git a/drivers/mysql/mysql/applier_incr.go b/drivers/mysql/mysql/applier_incr.go index 32974c3bd..3a7a9d38e 100644 --- a/drivers/mysql/mysql/applier_incr.go +++ b/drivers/mysql/mysql/applier_incr.go @@ -47,8 +47,8 @@ type ApplierIncr struct { gtidSet *gomysql.MysqlGTIDSet gtidSetLock *sync.RWMutex - gtidItemMap base.GtidItemMap - EntryCommittedHook func(entry *common.BinlogEntry) + gtidItemMap base.GtidItemMap + EntryExecutedHook func(entry *common.BinlogEntry) tableItems mapSchemaTableItems @@ -212,7 +212,7 @@ func (a *ApplierIncr) handleEntry(entryCtx *common.BinlogEntryContext) (err erro if binlogEntry.Coordinates.OSID == a.MySQLServerUuid { a.logger.Debug("skipping a dtle tx.", "osid", binlogEntry.Coordinates.OSID) - a.EntryCommittedHook(binlogEntry) // make gtid continuous + a.EntryExecutedHook(binlogEntry) // make gtid continuous return nil } txSid := binlogEntry.Coordinates.GetSid() @@ -379,6 +379,7 @@ func (a *ApplierIncr) heterogeneousReplay() { for _, entry := range binlogEntries.Entries { a.binlogEntryQueue <- entry + a.logger.Debug("") atomic.AddInt64(a.memory2, int64(entry.Size())) } @@ -589,10 +590,9 @@ func (a *ApplierIncr) ApplyBinlogEvent(workerIdx int, binlogEntryCtx *common.Bin } if err := dbApplier.Tx.Commit(); err != nil { - a.OnError(common.TaskStateDead, err) + return errors.Wrap(err, "dbApplier.Tx.Commit") } else { a.mtsManager.Executed(binlogEntry) - a.EntryCommittedHook(binlogEntry) } dbApplier.Tx = nil if a.printTps { @@ -600,6 +600,7 @@ func (a *ApplierIncr) ApplyBinlogEvent(workerIdx int, binlogEntryCtx *common.Bin } atomic.AddUint32(&a.appliedTxCount, 1) } + a.EntryExecutedHook(binlogEntry) // no error a.mysqlContext.Stage = common.StageWaitingForGtidToBeCommitted diff --git a/drivers/mysql/mysql/binlog/binlog_reader.go b/drivers/mysql/mysql/binlog/binlog_reader.go index 20871408f..89f61a897 100644 --- a/drivers/mysql/mysql/binlog/binlog_reader.go +++ b/drivers/mysql/mysql/binlog/binlog_reader.go @@ -99,6 +99,8 @@ type BinlogReader struct { targetGtid gomysql.GTIDSet currentGtidSet gomysql.GTIDSet currentGtidSetMutex sync.RWMutex + + HasBigTx sync.WaitGroup } type SqlFilter struct { @@ -933,6 +935,9 @@ func (b *BinlogReader) setDtleQuery(query string) string { } func (b *BinlogReader) sendEntry(entriesChannel chan<- *common.BinlogEntryContext) { + if b.entryContext.Entry.IsPartOfBigTx() { + b.HasBigTx.Add(1) + } b.logger.Debug("sendEntry", "gno", b.entryContext.Entry.Coordinates.GNO, "events", len(b.entryContext.Entry.Events)) atomic.AddInt64(b.memory, int64(b.entryContext.Entry.Size())) entriesChannel <- b.entryContext @@ -1054,6 +1059,10 @@ func (b *BinlogReader) DataStreamEvents(entriesChannel chan<- *common.BinlogEntr break } + b.logger.Trace("b.HasBigTx.Wait. before") + b.HasBigTx.Wait() + b.logger.Trace("b.HasBigTx.Wait. after") + ev, err := b.binlogStreamer.GetEvent(context.Background()) if err != nil { b.logger.Error("error GetEvent.", "err", err) diff --git a/drivers/mysql/mysql/extractor.go b/drivers/mysql/mysql/extractor.go index 7de3058f0..d8bfa19a7 100644 --- a/drivers/mysql/mysql/extractor.go +++ b/drivers/mysql/mysql/extractor.go @@ -614,6 +614,18 @@ func (e *Extractor) initNatsPubClient(natsAddr string) (err error) { return } + _, err = e.natsConn.Subscribe(fmt.Sprintf("%s_bigtx_ack", e.subject), func(m *gonats.Msg) { + err := e.natsConn.Publish(m.Reply, nil) + if err != nil { + e.onError(common.TaskStateDead, errors.Wrap(err, "bigtx_ack. reply")) + } + + ack := &common.BigTxAck{} + _, err = ack.Unmarshal(m.Data) + e.logger.Debug("bigtx_ack", "gno", ack.GNO, "index", ack.Index) + e.binlogReader.HasBigTx.Add(-1) + }) + _, err = e.natsConn.Subscribe(fmt.Sprintf("%s_progress", e.subject), func(m *gonats.Msg) { binlogFile := string(m.Data) e.logger.Debug("progress", "binlogFile", binlogFile)