Skip to content

Commit

Permalink
require dest ack for big tx
Browse files Browse the repository at this point in the history
  • Loading branch information
ffffwh committed Oct 15, 2021
1 parent 027c437 commit 4b86535
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 11 deletions.
5 changes: 5 additions & 0 deletions drivers/mysql/common/type.schema
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,8 @@ struct ControlMsg {
Type int32
Msg string
}

struct BigTxAck {
GNO int64
Index int32
}
70 changes: 70 additions & 0 deletions drivers/mysql/common/type.schema.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -2930,3 +2930,73 @@ func (d *ControlMsg) Unmarshal(buf []byte) (uint64, error) {
}
return i + 4, nil
}

type BigTxAck struct {
GNO int64
Index int32
}

func (d *BigTxAck) Size() (s uint64) {

s += 12
return
}
func (d *BigTxAck) Marshal(buf []byte) ([]byte, error) {
size := d.Size()
{
if uint64(cap(buf)) >= size {
buf = buf[:size]
} else {
buf = make([]byte, size)
}
}
i := uint64(0)

{

buf[0+0] = byte(d.GNO >> 0)

buf[1+0] = byte(d.GNO >> 8)

buf[2+0] = byte(d.GNO >> 16)

buf[3+0] = byte(d.GNO >> 24)

buf[4+0] = byte(d.GNO >> 32)

buf[5+0] = byte(d.GNO >> 40)

buf[6+0] = byte(d.GNO >> 48)

buf[7+0] = byte(d.GNO >> 56)

}
{

buf[0+8] = byte(d.Index >> 0)

buf[1+8] = byte(d.Index >> 8)

buf[2+8] = byte(d.Index >> 16)

buf[3+8] = byte(d.Index >> 24)

}
return buf[:i+12], nil
}

func (d *BigTxAck) Unmarshal(buf []byte) (uint64, error) {
i := uint64(0)

{

d.GNO = 0 | (int64(buf[0+0]) << 0) | (int64(buf[1+0]) << 8) | (int64(buf[2+0]) << 16) | (int64(buf[3+0]) << 24) | (int64(buf[4+0]) << 32) | (int64(buf[5+0]) << 40) | (int64(buf[6+0]) << 48) | (int64(buf[7+0]) << 56)

}
{

d.Index = 0 | (int32(buf[0+8]) << 0) | (int32(buf[1+8]) << 8) | (int32(buf[2+8]) << 16) | (int32(buf[3+8]) << 24)

}
return i + 12, nil
}
24 changes: 18 additions & 6 deletions drivers/mysql/mysql/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,8 +303,24 @@ func (a *Applier) Run() {
a.onError(common.TaskStateDead, errors.Wrap(err, "NewApplierIncr"))
return
}
a.ai.EntryCommittedHook = func(entry *common.BinlogEntry) {
a.gtidCh <- &entry.Coordinates
a.ai.EntryExecutedHook = func(entry *common.BinlogEntry) {
if entry.Final {
a.gtidCh <- &entry.Coordinates
}
if entry.IsPartOfBigTx() {
bs, err := (&common.BigTxAck{
GNO: entry.Coordinates.GNO,
Index: entry.Index,
}).Marshal(nil)
if err != nil {
a.onError(common.TaskStateDead, errors.Wrap(err, "bigtx_ack. Marshal"))
}
_, err = a.natsConn.Request(fmt.Sprintf("%s_bigtx_ack", a.subject),
bs, 1 * time.Minute)
if err != nil {
a.onError(common.TaskStateDead, errors.Wrap(err, "bigtx_ack. Request"))
}
}
}
a.ai.OnError = a.onError

Expand Down Expand Up @@ -563,10 +579,6 @@ func (a *Applier) subscribeNats() (err error) {
a.logger.Debug("incr. after publish nats reply.")
} else {
bs := incrNMM.GetBytes()
for g.IsLowMemory() {
a.logger.Debug("incr. low mem. waiting")
time.Sleep(900 * time.Millisecond)
}
select {
case <-a.shutdownCh:
return
Expand Down
11 changes: 6 additions & 5 deletions drivers/mysql/mysql/applier_incr.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ type ApplierIncr struct {

gtidSet *gomysql.MysqlGTIDSet
gtidSetLock *sync.RWMutex
gtidItemMap base.GtidItemMap
EntryCommittedHook func(entry *common.BinlogEntry)
gtidItemMap base.GtidItemMap
EntryExecutedHook func(entry *common.BinlogEntry)

tableItems mapSchemaTableItems

Expand Down Expand Up @@ -212,7 +212,7 @@ func (a *ApplierIncr) handleEntry(entryCtx *common.BinlogEntryContext) (err erro

if binlogEntry.Coordinates.OSID == a.MySQLServerUuid {
a.logger.Debug("skipping a dtle tx.", "osid", binlogEntry.Coordinates.OSID)
a.EntryCommittedHook(binlogEntry) // make gtid continuous
a.EntryExecutedHook(binlogEntry) // make gtid continuous
return nil
}
txSid := binlogEntry.Coordinates.GetSid()
Expand Down Expand Up @@ -379,6 +379,7 @@ func (a *ApplierIncr) heterogeneousReplay() {

for _, entry := range binlogEntries.Entries {
a.binlogEntryQueue <- entry
a.logger.Debug("")
atomic.AddInt64(a.memory2, int64(entry.Size()))
}

Expand Down Expand Up @@ -589,17 +590,17 @@ func (a *ApplierIncr) ApplyBinlogEvent(workerIdx int, binlogEntryCtx *common.Bin
}

if err := dbApplier.Tx.Commit(); err != nil {
a.OnError(common.TaskStateDead, err)
return errors.Wrap(err, "dbApplier.Tx.Commit")
} else {
a.mtsManager.Executed(binlogEntry)
a.EntryCommittedHook(binlogEntry)
}
dbApplier.Tx = nil
if a.printTps {
atomic.AddUint32(&a.txLastNSeconds, 1)
}
atomic.AddUint32(&a.appliedTxCount, 1)
}
a.EntryExecutedHook(binlogEntry)

// no error
a.mysqlContext.Stage = common.StageWaitingForGtidToBeCommitted
Expand Down
9 changes: 9 additions & 0 deletions drivers/mysql/mysql/binlog/binlog_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ type BinlogReader struct {
targetGtid gomysql.GTIDSet
currentGtidSet gomysql.GTIDSet
currentGtidSetMutex sync.RWMutex

HasBigTx sync.WaitGroup
}

type SqlFilter struct {
Expand Down Expand Up @@ -933,6 +935,9 @@ func (b *BinlogReader) setDtleQuery(query string) string {
}

func (b *BinlogReader) sendEntry(entriesChannel chan<- *common.BinlogEntryContext) {
if b.entryContext.Entry.IsPartOfBigTx() {
b.HasBigTx.Add(1)
}
b.logger.Debug("sendEntry", "gno", b.entryContext.Entry.Coordinates.GNO, "events", len(b.entryContext.Entry.Events))
atomic.AddInt64(b.memory, int64(b.entryContext.Entry.Size()))
entriesChannel <- b.entryContext
Expand Down Expand Up @@ -1054,6 +1059,10 @@ func (b *BinlogReader) DataStreamEvents(entriesChannel chan<- *common.BinlogEntr
break
}

b.logger.Trace("b.HasBigTx.Wait. before")
b.HasBigTx.Wait()
b.logger.Trace("b.HasBigTx.Wait. after")

ev, err := b.binlogStreamer.GetEvent(context.Background())
if err != nil {
b.logger.Error("error GetEvent.", "err", err)
Expand Down
12 changes: 12 additions & 0 deletions drivers/mysql/mysql/extractor.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,18 @@ func (e *Extractor) initNatsPubClient(natsAddr string) (err error) {
return
}

_, err = e.natsConn.Subscribe(fmt.Sprintf("%s_bigtx_ack", e.subject), func(m *gonats.Msg) {
err := e.natsConn.Publish(m.Reply, nil)
if err != nil {
e.onError(common.TaskStateDead, errors.Wrap(err, "bigtx_ack. reply"))
}

ack := &common.BigTxAck{}
_, err = ack.Unmarshal(m.Data)
e.logger.Debug("bigtx_ack", "gno", ack.GNO, "index", ack.Index)
e.binlogReader.HasBigTx.Add(-1)
})

_, err = e.natsConn.Subscribe(fmt.Sprintf("%s_progress", e.subject), func(m *gonats.Msg) {
binlogFile := string(m.Data)
e.logger.Debug("progress", "binlogFile", binlogFile)
Expand Down

0 comments on commit 4b86535

Please sign in to comment.