Skip to content

Commit

Permalink
Require and support configuring JSON transform
Browse files Browse the repository at this point in the history
Add commands "CREATE DATA MAPPING" and "LIST data_mappings".  Disable
JSON transform by default unless mappings are created.
  • Loading branch information
nassibnassar committed Sep 18, 2024
1 parent cc39697 commit 21b9da5
Show file tree
Hide file tree
Showing 19 changed files with 451 additions and 189 deletions.
5 changes: 2 additions & 3 deletions build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ usage() {
echo ' go install github.com/kisielk/errcheck@latest'
echo '-v Enable verbose output'
# echo '-D Enable "-tags dynamic" compiler option'
echo '-X Build with experimental code included'
# echo '-X Build with experimental code included'
}

while getopts 'fhJtvTXD' flag; do
while getopts 'fhJtvTD' flag; do
case "${flag}" in
t) runtests='true' ;;
T) runalltests='true' ;;
Expand All @@ -36,7 +36,6 @@ while getopts 'fhJtvTXD' flag; do
h) usage
exit 1 ;;
v) verbose='true' ;;
X) experiment='true' ;;
D) tagsdynamic='true' ;;
*) usage
exit 1 ;;
Expand Down
11 changes: 11 additions & 0 deletions cmd/metadb/ast/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,17 @@ type CreateDataSourceStmt struct {
func (*CreateDataSourceStmt) node() {}
func (*CreateDataSourceStmt) stmtNode() {}

type CreateDataMappingStmt struct {
TypeName string
TableName string
ColumnName string
Path string
TargetIdentifier string
}

func (*CreateDataMappingStmt) node() {}
func (*CreateDataMappingStmt) stmtNode() {}

type CreateDataOriginStmt struct {
OriginName string
}
Expand Down
20 changes: 20 additions & 0 deletions cmd/metadb/catalog/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/metadb-project/metadb/cmd/metadb/config"
"github.com/metadb-project/metadb/cmd/metadb/dbx"
"github.com/metadb-project/metadb/cmd/metadb/log"
"github.com/metadb-project/metadb/cmd/metadb/util"
Expand All @@ -25,6 +26,7 @@ type Catalog struct {
users map[string]*util.RegexList
columns map[dbx.Column]string
indexes map[dbx.Column]struct{}
jsonTransform map[config.JSONPath]string
lastSnapshotRecord time.Time
dp *pgxpool.Pool
lz4 bool
Expand Down Expand Up @@ -66,6 +68,9 @@ func Initialize(db *dbx.DB, dp *pgxpool.Pool) (*Catalog, error) {
if err := c.initIndexes(); err != nil {
return nil, err
}
if err := c.initJSON(); err != nil {
return nil, err
}
c.initSnapshot()
c.lz4 = isLZ4Available(c.dp)

Expand Down Expand Up @@ -138,6 +143,7 @@ var systemTables = []systemTableDef{
{table: dbx.Table{Schema: catalogSchema, Table: "source"}, create: createTableSource},
{table: dbx.Table{Schema: catalogSchema, Table: "table_update"}, create: createTableUpdate},
{table: dbx.Table{Schema: catalogSchema, Table: "base_table"}, create: createTableBaseTable},
{table: dbx.Table{Schema: catalogSchema, Table: "transform_json"}, create: createTableJSON},
}

//func SystemTables() []dbx.Table {
Expand Down Expand Up @@ -285,6 +291,20 @@ func createTableBaseTable(tx pgx.Tx) error {
return nil
}

func createTableJSON(tx pgx.Tx) error {
q := "CREATE TABLE " + catalogSchema + ".transform_json (" +
"schema_name varchar(63) NOT NULL, " +
"table_name varchar(63) NOT NULL, " +
"column_name varchar(63) NOT NULL, " +
"path text NOT NULL, " +
"PRIMARY KEY (schema_name, table_name, column_name, path), " +
"map text NOT NULL)"
if _, err := tx.Exec(context.TODO(), q); err != nil {
return fmt.Errorf("creating table "+catalogSchema+".transform_json: %w", err)
}
return nil
}

func (c *Catalog) TableUpdatedNow(table dbx.Table, elapsedTime time.Duration) error {
realtime := float32(math.Round(elapsedTime.Seconds()*10000) / 10000)
u := catalogSchema + ".table_update"
Expand Down
37 changes: 37 additions & 0 deletions cmd/metadb/catalog/jsoncfg.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package catalog

import (
"context"
"fmt"

"github.com/metadb-project/metadb/cmd/metadb/config"
)

func (c *Catalog) initJSON() error {
q := "SELECT schema_name, table_name, column_name, path, map FROM metadb.transform_json"
rows, err := c.dp.Query(context.TODO(), q)
if err != nil {
return fmt.Errorf("selecting json configuration: %w", err)
}
defer rows.Close()
t := make(map[config.JSONPath]string)
for rows.Next() {
var schema, table, column, path, tmap string
err := rows.Scan(&schema, &table, &column, &path, &tmap)
if err != nil {
return fmt.Errorf("reading json configuration: %w", err)
}
t[config.NewJSONPath(schema, table, column, path)] = tmap
}
if err := rows.Err(); err != nil {
return fmt.Errorf("reading json configuration: %w", err)
}
c.jsonTransform = t
return nil
}

func (c *Catalog) JSONPathLookup(path config.JSONPath) string {
c.mu.Lock()
defer c.mu.Unlock()
return c.jsonTransform[path]
}
17 changes: 17 additions & 0 deletions cmd/metadb/catalog/track.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,23 @@ func createSchemaIfNotExists(c *Catalog, table *dbx.Table) error {
return nil
}

func IsReservedColumn(column string) bool {
switch column {
case "__id":
return true
case "__start":
return true
case "__end":
return true
case "__current":
return true
case "__origin":
return true
default:
return false
}
}

func createMainTableIfNotExists(c *Catalog, table *dbx.Table) error {
q := "CREATE TABLE IF NOT EXISTS " + table.MainSQL() + " (" +
"__id bigint GENERATED BY DEFAULT AS IDENTITY, " +
Expand Down
49 changes: 4 additions & 45 deletions cmd/metadb/command/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,47 +189,6 @@ func NewCommandGraph() *CommandGraph {
return &CommandGraph{Commands: list.New()}
}

func (g *CommandGraph) String() string {
var b strings.Builder
g.writeCommands(&b, g.Commands, 0)
return b.String()
}

func (g *CommandGraph) writeCommands(b *strings.Builder, commands *list.List, indent int) {
if commands == nil {
return
}
for e := commands.Front(); e != nil; e = e.Next() {
for i := 0; i < indent; i++ {
b.WriteRune(' ')
}
cmd := *(e.Value.(*Command))
fmt.Fprintf(b, "> %s %s.%s\n", cmd.Op, cmd.SchemaName, cmd.TableName)
for i := range cmd.Column {
for j := 0; j < indent+8; j++ {
b.WriteRune(' ')
}
if cmd.Column[i].PrimaryKey == 0 {
b.WriteRune('-')
} else {
fmt.Fprintf(b, "= [%d]", cmd.Column[i].PrimaryKey)
}
fmt.Fprintf(b, " %s (%s): ", cmd.Column[i].Name, DataTypeToSQL(cmd.Column[i].DType, cmd.Column[i].DTypeSize))
if cmd.Column[i].SQLData == nil {
b.WriteString("null")
} else {
s := *(cmd.Column[i].SQLData)
if len(s) > 40 {
s = s[:40] + "..."
}
fmt.Fprintf(b, "%s", s)
}
b.WriteRune('\n')
}
g.writeCommands(b, (e.Value.(*Command)).Subcommands, indent+4)
}
}

type Command struct {
Op Operation
SchemaName string
Expand Down Expand Up @@ -632,7 +591,7 @@ func NewCommand(dedup *log.MessageSet, ce *change.Event, schemaPassFilter, schem
if ce.Value == nil || ce.Value.Payload == nil {
var name string
var key interface{}
if ce != nil && ce.Key != nil {
if ce.Key != nil {
name = *ce.Key.Schema.Name
key = ce.Key.Payload
}
Expand Down Expand Up @@ -993,9 +952,9 @@ func trimFractionalZeros(s string) string {

func PrimaryKeyColumns(columns []CommandColumn) []CommandColumn {
var pkey []CommandColumn
for _, col := range columns {
if col.PrimaryKey != 0 {
pkey = append(pkey, col)
for i := range columns {
if columns[i].PrimaryKey != 0 {
pkey = append(pkey, columns[i])
}
}
sort.Slice(pkey, func(i, j int) bool {
Expand Down
45 changes: 45 additions & 0 deletions cmd/metadb/config/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package config

import (
"fmt"
"strings"
)

type JSONPath struct {
Schema string
Table string
Column string
Path [16]string
}

func NewJSONPath(schema, table, column string, path string) JSONPath {
k := JSONPath{
Schema: schema,
Table: table,
Column: column,
}
if path != "" {
s := strings.Split(path, ".")
for i := 1; i < len(s); i++ {
k.Path[i-1] = s[i]
}
}
return k
}

func (j JSONPath) Append(node string) JSONPath {
k := JSONPath{
Schema: j.Schema,
Table: j.Table,
Column: j.Column,
}
for i := 0; i < len(j.Path); i++ {
if j.Path[i] == "" {
k.Path[i] = node
return k
} else {
k.Path[i] = j.Path[i]
}
}
panic(fmt.Sprintf("JSON path exceeds limit of %d nodes", len(j.Path)))
}
Loading

0 comments on commit 21b9da5

Please sign in to comment.