Skip to content

Commit

Permalink
qrep first draft
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Dec 31, 2024
1 parent dd661e9 commit f02aa2d
Show file tree
Hide file tree
Showing 5 changed files with 451 additions and 160 deletions.
1 change: 1 addition & 0 deletions flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,7 @@ var (
_ CreateTablesFromExistingConnector = &connsnowflake.SnowflakeConnector{}

_ QRepPullConnector = &connpostgres.PostgresConnector{}
_ QRepPullConnector = &connmysql.MySqlConnector{}
_ QRepPullConnector = &connsqlserver.SQLServerConnector{}

_ QRepPullPgConnector = &connpostgres.PostgresConnector{}
Expand Down
189 changes: 64 additions & 125 deletions flow/connectors/mysql/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,74 +66,13 @@ func (c *MySqlConnector) getTableSchemaForTable(
}
columns := make([]*protos.FieldDescription, 0, len(rs.Values))
primary := make([]string, 0)

for _, field := range rs.Fields {
var qkind qvalue.QValueKind
switch field.Type {
case mysql.MYSQL_TYPE_DECIMAL:
qkind = qvalue.QValueKindNumeric
case mysql.MYSQL_TYPE_TINY:
qkind = qvalue.QValueKindInt16 // TODO qvalue.QValueKindInt8
case mysql.MYSQL_TYPE_SHORT:
qkind = qvalue.QValueKindInt16
case mysql.MYSQL_TYPE_LONG:
qkind = qvalue.QValueKindInt32
case mysql.MYSQL_TYPE_FLOAT:
qkind = qvalue.QValueKindFloat32
case mysql.MYSQL_TYPE_DOUBLE:
qkind = qvalue.QValueKindFloat64
case mysql.MYSQL_TYPE_NULL:
qkind = qvalue.QValueKindInvalid // TODO qvalue.QValueKindNothing
case mysql.MYSQL_TYPE_TIMESTAMP:
qkind = qvalue.QValueKindTimestamp
case mysql.MYSQL_TYPE_LONGLONG:
qkind = qvalue.QValueKindInt64
case mysql.MYSQL_TYPE_INT24:
qkind = qvalue.QValueKindInt32
case mysql.MYSQL_TYPE_DATE:
qkind = qvalue.QValueKindDate
case mysql.MYSQL_TYPE_TIME:
qkind = qvalue.QValueKindTime
case mysql.MYSQL_TYPE_DATETIME:
qkind = qvalue.QValueKindTimestamp
case mysql.MYSQL_TYPE_YEAR:
qkind = qvalue.QValueKindInt16
case mysql.MYSQL_TYPE_NEWDATE:
qkind = qvalue.QValueKindDate
case mysql.MYSQL_TYPE_VARCHAR:
qkind = qvalue.QValueKindString
case mysql.MYSQL_TYPE_BIT:
qkind = qvalue.QValueKindInt64
case mysql.MYSQL_TYPE_TIMESTAMP2:
qkind = qvalue.QValueKindTimestamp
case mysql.MYSQL_TYPE_DATETIME2:
qkind = qvalue.QValueKindTimestamp
case mysql.MYSQL_TYPE_TIME2:
qkind = qvalue.QValueKindTime
case mysql.MYSQL_TYPE_JSON:
qkind = qvalue.QValueKindJSON
case mysql.MYSQL_TYPE_NEWDECIMAL:
qkind = qvalue.QValueKindNumeric
case mysql.MYSQL_TYPE_ENUM:
qkind = qvalue.QValueKindInt64
case mysql.MYSQL_TYPE_SET:
qkind = qvalue.QValueKindInt64
case mysql.MYSQL_TYPE_TINY_BLOB:
qkind = qvalue.QValueKindBytes
case mysql.MYSQL_TYPE_MEDIUM_BLOB:
qkind = qvalue.QValueKindBytes
case mysql.MYSQL_TYPE_LONG_BLOB:
qkind = qvalue.QValueKindBytes
case mysql.MYSQL_TYPE_BLOB:
qkind = qvalue.QValueKindBytes
case mysql.MYSQL_TYPE_VAR_STRING:
qkind = qvalue.QValueKindString
case mysql.MYSQL_TYPE_STRING:
qkind = qvalue.QValueKindString
case mysql.MYSQL_TYPE_GEOMETRY:
qkind = qvalue.QValueKindGeometry
default:
return nil, fmt.Errorf("unknown mysql type %d", field.Type)
qkind, err := qkindFromMysql(field.Type)
if err != nil {
return nil, err
}

column := &protos.FieldDescription{
Name: string(field.Name),
Type: string(qkind),
Expand Down Expand Up @@ -242,61 +181,6 @@ func (c *MySqlConnector) RemoveTablesFromPublication(ctx context.Context, req *p
return nil
}

func qvalueFromMysql(mytype byte, qkind qvalue.QValueKind, val any) qvalue.QValue {
// TODO signedness, in ev.Table, need to extend QValue system
// See go-mysql row_event.go for mapping
switch val := val.(type) {
case nil:
return qvalue.QValueNull(qkind)
case int8: // TODO qvalue.Int8
return qvalue.QValueInt16{Val: int16(val)}
case int16:
return qvalue.QValueInt16{Val: val}
case int32:
return qvalue.QValueInt32{Val: val}
case int64:
return qvalue.QValueInt64{Val: val}
case float32:
return qvalue.QValueFloat32{Val: val}
case float64:
return qvalue.QValueFloat64{Val: val}
case decimal.Decimal:
return qvalue.QValueNumeric{Val: val}
case int:
// YEAR: https://dev.mysql.com/doc/refman/8.4/en/year.html
return qvalue.QValueInt16{Val: int16(val)}
case time.Time:
return qvalue.QValueTimestamp{Val: val}
case *replication.JsonDiff:
// TODO support somehow??
return qvalue.QValueNull(qvalue.QValueKindJSON)
case []byte:
switch mytype {
case mysql.MYSQL_TYPE_BLOB:
return qvalue.QValueBytes{Val: val}
case mysql.MYSQL_TYPE_JSON:
return qvalue.QValueJSON{Val: string(val)}
case mysql.MYSQL_TYPE_GEOMETRY:
// TODO figure out mysql geo encoding
return qvalue.QValueGeometry{Val: string(val)}
}
case string:
switch mytype {
case mysql.MYSQL_TYPE_TIME:
// TODO parse
case mysql.MYSQL_TYPE_TIME2:
// TODO parse
case mysql.MYSQL_TYPE_DATE:
// TODO parse
case mysql.MYSQL_TYPE_VARCHAR,
mysql.MYSQL_TYPE_VAR_STRING,
mysql.MYSQL_TYPE_STRING:
return qvalue.QValueString{Val: val}
}
}
panic(fmt.Sprintf("unexpected type %T for mysql type %d", val, mytype))
}

func (c *MySqlConnector) PullRecords(
ctx context.Context,
catalogPool *pgxpool.Pool,
Expand Down Expand Up @@ -372,7 +256,7 @@ func (c *MySqlConnector) PullRecords(
items := model.NewRecordItems(len(row))
for idx, val := range row {
fd := schema.Columns[idx]
items.AddColumn(fd.Name, qvalueFromMysql(ev.Table.ColumnType[idx], qvalue.QValueKind(fd.Type), val))
items.AddColumn(fd.Name, qvalueFromMysqlRowEvent(ev.Table.ColumnType[idx], qvalue.QValueKind(fd.Type), val))
}

recordCount += 1
Expand All @@ -392,13 +276,13 @@ func (c *MySqlConnector) PullRecords(
oldItems := model.NewRecordItems(len(oldRow))
for idx, val := range oldRow {
fd := schema.Columns[idx]
oldItems.AddColumn(fd.Name, qvalueFromMysql(ev.Table.ColumnType[idx], qvalue.QValueKind(fd.Type), val))
oldItems.AddColumn(fd.Name, qvalueFromMysqlRowEvent(ev.Table.ColumnType[idx], qvalue.QValueKind(fd.Type), val))
}
newRow := ev.Rows[idx+1]
newItems := model.NewRecordItems(len(newRow))
for idx, val := range ev.Rows[idx+1] {
fd := schema.Columns[idx]
newItems.AddColumn(fd.Name, qvalueFromMysql(ev.Table.ColumnType[idx], qvalue.QValueKind(fd.Type), val))
newItems.AddColumn(fd.Name, qvalueFromMysqlRowEvent(ev.Table.ColumnType[idx], qvalue.QValueKind(fd.Type), val))
}

recordCount += 1
Expand All @@ -417,7 +301,7 @@ func (c *MySqlConnector) PullRecords(
items := model.NewRecordItems(len(row))
for idx, val := range row {
fd := schema.Columns[idx]
items.AddColumn(fd.Name, qvalueFromMysql(ev.Table.ColumnType[idx], qvalue.QValueKind(fd.Type), val))
items.AddColumn(fd.Name, qvalueFromMysqlRowEvent(ev.Table.ColumnType[idx], qvalue.QValueKind(fd.Type), val))
}

recordCount += 1
Expand All @@ -440,3 +324,58 @@ func (c *MySqlConnector) PullRecords(
}
}
}

func qvalueFromMysqlRowEvent(mytype byte, qkind qvalue.QValueKind, val any) qvalue.QValue {
// TODO signedness, in ev.Table, need to extend QValue system
// See go-mysql row_event.go for mapping
switch val := val.(type) {
case nil:
return qvalue.QValueNull(qkind)
case int8: // TODO qvalue.Int8
return qvalue.QValueInt16{Val: int16(val)}
case int16:
return qvalue.QValueInt16{Val: val}
case int32:
return qvalue.QValueInt32{Val: val}
case int64:
return qvalue.QValueInt64{Val: val}
case float32:
return qvalue.QValueFloat32{Val: val}
case float64:
return qvalue.QValueFloat64{Val: val}
case decimal.Decimal:
return qvalue.QValueNumeric{Val: val}
case int:
// YEAR: https://dev.mysql.com/doc/refman/8.4/en/year.html
return qvalue.QValueInt16{Val: int16(val)}
case time.Time:
return qvalue.QValueTimestamp{Val: val}
case *replication.JsonDiff:
// TODO support somehow??
return qvalue.QValueNull(qvalue.QValueKindJSON)
case []byte:
switch mytype {
case mysql.MYSQL_TYPE_BLOB:
return qvalue.QValueBytes{Val: val}
case mysql.MYSQL_TYPE_JSON:
return qvalue.QValueJSON{Val: string(val)}
case mysql.MYSQL_TYPE_GEOMETRY:
// TODO figure out mysql geo encoding
return qvalue.QValueGeometry{Val: string(val)}
}
case string:
switch mytype {
case mysql.MYSQL_TYPE_TIME:
// TODO parse
case mysql.MYSQL_TYPE_TIME2:
// TODO parse
case mysql.MYSQL_TYPE_DATE:
// TODO parse
case mysql.MYSQL_TYPE_VARCHAR,
mysql.MYSQL_TYPE_VAR_STRING,
mysql.MYSQL_TYPE_STRING:
return qvalue.QValueString{Val: val}
}
}
panic(fmt.Sprintf("unexpected type %T for mysql type %d", val, mytype))
}
116 changes: 116 additions & 0 deletions flow/connectors/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package connmysql
import (
"context"
"crypto/tls"
"errors"
"fmt"
"log/slog"
"time"
Expand All @@ -16,6 +17,7 @@ import (

metadataStore "github.com/PeerDB-io/peer-flow/connectors/external_metadata"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/shared"
)

Expand Down Expand Up @@ -76,6 +78,7 @@ func (c *MySqlConnector) connect(ctx context.Context, options ...client.Option)
func (c *MySqlConnector) Execute(ctx context.Context, cmd string, args ...interface{}) (*mysql.Result, error) {
reconnects := 3
for {
// TODO need new connection if ctx changes between calls, or make upstream PR
if c.conn == nil {
var err error
var argF []client.Option
Expand Down Expand Up @@ -154,3 +157,116 @@ func (c *MySqlConnector) GetVersion(ctx context.Context) (string, error) {
c.logger.Info("[mysql] version", slog.String("version", version))
return version, nil
}

func qkindFromMysql(ty uint8) (qvalue.QValueKind, error) {
switch ty {
case mysql.MYSQL_TYPE_DECIMAL:
return qvalue.QValueKindNumeric, nil
case mysql.MYSQL_TYPE_TINY:
return qvalue.QValueKindInt16, nil // TODO qvalue.QValueKindInt8
case mysql.MYSQL_TYPE_SHORT:
return qvalue.QValueKindInt16, nil
case mysql.MYSQL_TYPE_LONG:
return qvalue.QValueKindInt32, nil
case mysql.MYSQL_TYPE_FLOAT:
return qvalue.QValueKindFloat32, nil
case mysql.MYSQL_TYPE_DOUBLE:
return qvalue.QValueKindFloat64, nil
case mysql.MYSQL_TYPE_NULL:
return qvalue.QValueKindInvalid, nil // TODO qvalue.QValueKindNothing
case mysql.MYSQL_TYPE_TIMESTAMP:
return qvalue.QValueKindTimestamp, nil
case mysql.MYSQL_TYPE_LONGLONG:
return qvalue.QValueKindInt64, nil
case mysql.MYSQL_TYPE_INT24:
return qvalue.QValueKindInt32, nil
case mysql.MYSQL_TYPE_DATE:
return qvalue.QValueKindDate, nil
case mysql.MYSQL_TYPE_TIME:
return qvalue.QValueKindTime, nil
case mysql.MYSQL_TYPE_DATETIME:
return qvalue.QValueKindTimestamp, nil
case mysql.MYSQL_TYPE_YEAR:
return qvalue.QValueKindInt16, nil
case mysql.MYSQL_TYPE_NEWDATE:
return qvalue.QValueKindDate, nil
case mysql.MYSQL_TYPE_VARCHAR:
return qvalue.QValueKindString, nil
case mysql.MYSQL_TYPE_BIT:
return qvalue.QValueKindInt64, nil
case mysql.MYSQL_TYPE_TIMESTAMP2:
return qvalue.QValueKindTimestamp, nil
case mysql.MYSQL_TYPE_DATETIME2:
return qvalue.QValueKindTimestamp, nil
case mysql.MYSQL_TYPE_TIME2:
return qvalue.QValueKindTime, nil
case mysql.MYSQL_TYPE_JSON:
return qvalue.QValueKindJSON, nil
case mysql.MYSQL_TYPE_NEWDECIMAL:
return qvalue.QValueKindNumeric, nil
case mysql.MYSQL_TYPE_ENUM:
return qvalue.QValueKindInt64, nil
case mysql.MYSQL_TYPE_SET:
return qvalue.QValueKindInt64, nil
case mysql.MYSQL_TYPE_TINY_BLOB:
return qvalue.QValueKindBytes, nil
case mysql.MYSQL_TYPE_MEDIUM_BLOB:
return qvalue.QValueKindBytes, nil
case mysql.MYSQL_TYPE_LONG_BLOB:
return qvalue.QValueKindBytes, nil
case mysql.MYSQL_TYPE_BLOB:
return qvalue.QValueKindBytes, nil
case mysql.MYSQL_TYPE_VAR_STRING:
return qvalue.QValueKindString, nil
case mysql.MYSQL_TYPE_STRING:
return qvalue.QValueKindString, nil
case mysql.MYSQL_TYPE_GEOMETRY:
return qvalue.QValueKindGeometry, nil
default:
return qvalue.QValueKind(""), fmt.Errorf("unknown mysql type %d", ty)
}
}

func qvalueFromMysqlFieldValue(qkind qvalue.QValueKind, fv mysql.FieldValue) (qvalue.QValue, error) {
// TODO fill this in, maybe contribute upstream, figvure out how numeric etc fit in
switch v := fv.Value().(type) {
case nil:
return qvalue.QValueNull(qkind), nil
case uint64:
// TODO unsigned integers
return nil, errors.New("mysql unsigned integers not supported")
case int64:
switch qkind {
case qvalue.QValueKindInt16:
return qvalue.QValueInt16{Val: int16(v)}, nil
case qvalue.QValueKindInt32:
return qvalue.QValueInt32{Val: int32(v)}, nil
case qvalue.QValueKindInt64:
return qvalue.QValueInt64{Val: v}, nil
default:
return nil, fmt.Errorf("cannot convert int to %s", qkind)
}
case float64:
switch qkind {
case qvalue.QValueKindFloat32:
return qvalue.QValueFloat32{Val: float32(v)}, nil
case qvalue.QValueKindFloat64:
return qvalue.QValueFloat64{Val: float64(v)}, nil
default:
return nil, fmt.Errorf("cannot convert float to %s", qkind)
}
case string:
switch qkind {
case qvalue.QValueKindString:
return qvalue.QValueString{Val: v}, nil
case qvalue.QValueKindBytes:
return qvalue.QValueBytes{Val: []byte(v)}, nil
case qvalue.QValueKindJSON:
return qvalue.QValueJSON{Val: v}, nil
default:
return nil, fmt.Errorf("cannot convert string to %s", qkind)
}
default:
return nil, fmt.Errorf("unexpected mysql type %T", v)
}
}
Loading

0 comments on commit f02aa2d

Please sign in to comment.