Skip to content

Commit

Permalink
Merge branch '779-big-tx-memory-1' #779
Browse files Browse the repository at this point in the history
  • Loading branch information
ffffwh committed Oct 15, 2021
2 parents 69bb18d + 4b86535 commit 9b55df4
Show file tree
Hide file tree
Showing 17 changed files with 363 additions and 131 deletions.
9 changes: 9 additions & 0 deletions cmd/nomad-plugin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"flag"
"fmt"
"github.com/shirou/gopsutil/v3/mem"
_ "net/http/pprof"
"os"
"runtime"
Expand All @@ -29,6 +30,14 @@ func main() {
plugins.Serve(func(logger hclog.Logger) interface{} {
g.Logger = logger

vmStat, err := mem.VirtualMemory()
if err != nil {
logger.Warn("cannot get available memory. assuming 4096MB")
g.MemAvailable = 4096 * 1024 * 1024
}
logger.Info("available memory in MB", "size", vmStat.Available / 1024 / 1024)
g.MemAvailable = vmStat.Available

logger.Info("dtle starting", "version", versionStr, "pid", pid)
logger.Info("env", "GODEBUG", os.Getenv("GODEBUG"), "GOMAXPROCS", runtime.GOMAXPROCS(0))
logger.Debug("plugins.Serve Factory called.")
Expand Down
4 changes: 4 additions & 0 deletions drivers/mysql/common/binlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ func (b *BinlogEntry) HasDDL() bool {
return false
}

func (b *BinlogEntry) IsPartOfBigTx() bool {
return !(b.Index == 0 && b.Final)
}

// Duplicate creates and returns a new binlog entry, with some of the attributes pre-assigned
func (b *BinlogEntry) String() string {
return fmt.Sprintf("[BinlogEntry at %+v]", b.Coordinates)
Expand Down
11 changes: 9 additions & 2 deletions drivers/mysql/common/type.schema
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,10 @@ struct DataEvent {
}

struct BinlogEntry {
Coordinates BinlogCoordinateTx
Events []DataEvent
Coordinates BinlogCoordinateTx
Events []DataEvent
Index int32
Final bool
}

struct BinlogEntries {
Expand All @@ -77,3 +79,8 @@ struct ControlMsg {
Type int32
Msg string
}

struct BigTxAck {
GNO int64
Index int32
}
103 changes: 101 additions & 2 deletions drivers/mysql/common/type.schema.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -2529,6 +2529,8 @@ func (d *DataEvent) Unmarshal(buf []byte) (uint64, error) {
type BinlogEntry struct {
Coordinates BinlogCoordinateTx
Events []DataEvent
Index int32
Final bool
}

func (d *BinlogEntry) Size() (s uint64) {
Expand Down Expand Up @@ -2559,6 +2561,7 @@ func (d *BinlogEntry) Size() (s uint64) {
}

}
s += 5
return
}
func (d *BinlogEntry) Marshal(buf []byte) ([]byte, error) {
Expand Down Expand Up @@ -2607,7 +2610,25 @@ func (d *BinlogEntry) Marshal(buf []byte) ([]byte, error) {

}
}
return buf[:i+0], nil
{

buf[i+0+0] = byte(d.Index >> 0)

buf[i+1+0] = byte(d.Index >> 8)

buf[i+2+0] = byte(d.Index >> 16)

buf[i+3+0] = byte(d.Index >> 24)

}
{
if d.Final {
buf[i+4] = 1
} else {
buf[i+4] = 0
}
}
return buf[:i+5], nil
}

func (d *BinlogEntry) Unmarshal(buf []byte) (uint64, error) {
Expand Down Expand Up @@ -2654,7 +2675,15 @@ func (d *BinlogEntry) Unmarshal(buf []byte) (uint64, error) {

}
}
return i + 0, nil
{

d.Index = 0 | (int32(buf[i+0+0]) << 0) | (int32(buf[i+1+0]) << 8) | (int32(buf[i+2+0]) << 16) | (int32(buf[i+3+0]) << 24)

}
{
d.Final = buf[i+4] == 1
}
return i + 5, nil
}

type BinlogEntries struct {
Expand Down Expand Up @@ -2901,3 +2930,73 @@ func (d *ControlMsg) Unmarshal(buf []byte) (uint64, error) {
}
return i + 4, nil
}

type BigTxAck struct {
GNO int64
Index int32
}

func (d *BigTxAck) Size() (s uint64) {

s += 12
return
}
func (d *BigTxAck) Marshal(buf []byte) ([]byte, error) {
size := d.Size()
{
if uint64(cap(buf)) >= size {
buf = buf[:size]
} else {
buf = make([]byte, size)
}
}
i := uint64(0)

{

buf[0+0] = byte(d.GNO >> 0)

buf[1+0] = byte(d.GNO >> 8)

buf[2+0] = byte(d.GNO >> 16)

buf[3+0] = byte(d.GNO >> 24)

buf[4+0] = byte(d.GNO >> 32)

buf[5+0] = byte(d.GNO >> 40)

buf[6+0] = byte(d.GNO >> 48)

buf[7+0] = byte(d.GNO >> 56)

}
{

buf[0+8] = byte(d.Index >> 0)

buf[1+8] = byte(d.Index >> 8)

buf[2+8] = byte(d.Index >> 16)

buf[3+8] = byte(d.Index >> 24)

}
return buf[:i+12], nil
}

func (d *BigTxAck) Unmarshal(buf []byte) (uint64, error) {
i := uint64(0)

{

d.GNO = 0 | (int64(buf[0+0]) << 0) | (int64(buf[1+0]) << 8) | (int64(buf[2+0]) << 16) | (int64(buf[3+0]) << 24) | (int64(buf[4+0]) << 32) | (int64(buf[5+0]) << 40) | (int64(buf[6+0]) << 48) | (int64(buf[7+0]) << 56)

}
{

d.Index = 0 | (int32(buf[0+8]) << 0) | (int32(buf[1+8]) << 8) | (int32(buf[2+8]) << 16) | (int32(buf[3+8]) << 24)

}
return i + 12, nil
}
2 changes: 2 additions & 0 deletions drivers/mysql/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ var (
hclspec.NewLiteral(`""`)),
"key_file_path": hclspec.NewDefault(hclspec.NewAttr("key_file_path", "string", false),
hclspec.NewLiteral(`""`)),
"memory": hclspec.NewAttr("memory", "string", false),
})

// taskConfigSpec is the hcl specification for the driver config section of
Expand Down Expand Up @@ -274,6 +275,7 @@ type DriverConfig struct {
RsaPrivateKeyPath string `codec:"rsa_private_key_path"`
CertFilePath string `codec:"cert_file_path"`
KeyFilePath string `codec:"key_file_path"`
Memory string `codec:"memory"`
}

func (d *Driver) SetConfig(c *base.Config) (err error) {
Expand Down
20 changes: 18 additions & 2 deletions drivers/mysql/mysql/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,8 +303,24 @@ func (a *Applier) Run() {
a.onError(common.TaskStateDead, errors.Wrap(err, "NewApplierIncr"))
return
}
a.ai.EntryCommittedHook = func(entry *common.BinlogEntry) {
a.gtidCh <- &entry.Coordinates
a.ai.EntryExecutedHook = func(entry *common.BinlogEntry) {
if entry.Final {
a.gtidCh <- &entry.Coordinates
}
if entry.IsPartOfBigTx() {
bs, err := (&common.BigTxAck{
GNO: entry.Coordinates.GNO,
Index: entry.Index,
}).Marshal(nil)
if err != nil {
a.onError(common.TaskStateDead, errors.Wrap(err, "bigtx_ack. Marshal"))
}
_, err = a.natsConn.Request(fmt.Sprintf("%s_bigtx_ack", a.subject),
bs, 1 * time.Minute)
if err != nil {
a.onError(common.TaskStateDead, errors.Wrap(err, "bigtx_ack. Request"))
}
}
}
a.ai.OnError = a.onError

Expand Down
Loading

0 comments on commit 9b55df4

Please sign in to comment.