Skip to content

Commit

Permalink
Merge branch 'json-prefix'
Browse files Browse the repository at this point in the history
Adjust target names in JSON transform
  • Loading branch information
nassibnassar committed Sep 19, 2024
2 parents 21b9da5 + 5029ad8 commit 9b6dcf2
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 29 deletions.
3 changes: 2 additions & 1 deletion cmd/metadb/catalog/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,8 @@ func createTableJSON(tx pgx.Tx) error {
"column_name varchar(63) NOT NULL, " +
"path text NOT NULL, " +
"PRIMARY KEY (schema_name, table_name, column_name, path), " +
"map text NOT NULL)"
"map text NOT NULL, " +
"UNIQUE (schema_name, table_name, column_name, map))"
if _, err := tx.Exec(context.TODO(), q); err != nil {
return fmt.Errorf("creating table "+catalogSchema+".transform_json: %w", err)
}
Expand Down
25 changes: 10 additions & 15 deletions cmd/metadb/jsonx/jsonx.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,14 @@ func RewriteJSON(cat *catalog.Catalog, cmd *command.Command, column *command.Com
if !ok {
return nil
}
objectLevel := 1
arrayLevel := 0
table := cmd.TableName + "__" + tmap
rootkey := command.PrimaryKeyColumns(cmd.Column)
for i := range rootkey {
rootkey[i].Name = "__root__" + rootkey[i].Name
}
quasikey := make([]command.CommandColumn, 0)
deletions := make(map[string]struct{})
if err := rewriteExtendedObject(cat, cmd, objectLevel, arrayLevel, "", obj, table, rootkey, quasikey, path, deletions); err != nil {
if err := rewriteExtendedObject(cat, cmd, obj, table, rootkey, quasikey, path, deletions); err != nil {
return fmt.Errorf("rewrite json: %s", err)
}
return nil
Expand All @@ -52,10 +50,10 @@ func RewriteJSON(cat *catalog.Catalog, cmd *command.Command, column *command.Com
// indices are added in a column named with the prefix "__ord__". Primary key
// columns of the root command are included with the prefix "__root__" added to
// the column names.
func rewriteExtendedObject(cat *catalog.Catalog, cmd *command.Command, objectLevel, arrayLevel int, attrPrefix string, obj map[string]any, table string, rootkey, quasikey []command.CommandColumn, path config.JSONPath, deletions map[string]struct{}) error {
func rewriteExtendedObject(cat *catalog.Catalog, cmd *command.Command, obj map[string]any, table string, rootkey, quasikey []command.CommandColumn, path config.JSONPath, deletions map[string]struct{}) error {
cols := make([]command.CommandColumn, 0)
cols = append(cols, rootkey...)
if err := rewriteObject(cat, cmd, objectLevel, arrayLevel, attrPrefix, obj, table, &cols, rootkey, quasikey, path, deletions); err != nil {
if err := rewriteObject(cat, cmd, "", obj, table, &cols, rootkey, quasikey, path, deletions); err != nil {
return fmt.Errorf("rewrite json object: %s", err)
}
newcmd := &command.Command{
Expand All @@ -72,8 +70,8 @@ func rewriteExtendedObject(cat *catalog.Catalog, cmd *command.Command, objectLev
return nil
}

func rewriteObject(cat *catalog.Catalog, cmd *command.Command, objectLevel, arrayLevel int, attrPrefix string, obj map[string]any, table string, cols *[]command.CommandColumn, rootkey, quasikey []command.CommandColumn, path config.JSONPath, deletions map[string]struct{}) error {
if objectLevel > 5 {
func rewriteObject(cat *catalog.Catalog, cmd *command.Command, attrPrefix string, obj map[string]any, table string, cols *[]command.CommandColumn, rootkey, quasikey []command.CommandColumn, path config.JSONPath, deletions map[string]struct{}) error {
if path.Path[len(path.Path)-1] != "" {
return nil
}
qkey := slices.Clone(quasikey)
Expand Down Expand Up @@ -112,10 +110,7 @@ func rewriteObject(cat *catalog.Catalog, cmd *command.Command, objectLevel, arra
if t == "" {
continue
}
if arrayLevel > 0 {
t = attrPrefix + t
}
if err := rewriteArray(cat, cmd, objectLevel, arrayLevel+1, t, v, cmd.TableName+"__"+t, rootkey, qkey, path.Append(name), deletions); err != nil {
if err := rewriteArray(cat, cmd, t, v, cmd.TableName+"__"+t, rootkey, qkey, p, deletions); err != nil {
return err
}
case map[string]any:
Expand All @@ -124,16 +119,16 @@ func rewriteObject(cat *catalog.Catalog, cmd *command.Command, objectLevel, arra
if t == "" {
continue
}
if err := rewriteObject(cat, cmd, objectLevel+1, arrayLevel, attrPrefix+t+"__", v, table, cols, rootkey, qkey, p, deletions); err != nil {
if err := rewriteObject(cat, cmd, t+"__", v, table, cols, rootkey, qkey, p, deletions); err != nil {
return err
}
}
}
return nil
}

func rewriteArray(cat *catalog.Catalog, cmd *command.Command, objectLevel, arrayLevel int, aname string, adata []any, table string, rootkey, quasikey []command.CommandColumn, path config.JSONPath, deletions map[string]struct{}) error {
if arrayLevel > 3 {
func rewriteArray(cat *catalog.Catalog, cmd *command.Command, aname string, adata []any, table string, rootkey, quasikey []command.CommandColumn, path config.JSONPath, deletions map[string]struct{}) error {
if path.Path[len(path.Path)-1] != "" {
return nil
}
_, ok := deletions[table]
Expand Down Expand Up @@ -190,7 +185,7 @@ func rewriteArray(cat *catalog.Catalog, cmd *command.Command, objectLevel, array
case []any: // Not supported
continue
case map[string]any:
if err := rewriteObject(cat, cmd, objectLevel+1, arrayLevel, aname+"__", v, table, &cols, rootkey, qkey, path, deletions); err != nil {
if err := rewriteObject(cat, cmd, "", v, table, &cols, rootkey, qkey, path, deletions); err != nil {
return fmt.Errorf("rewrite json: %s", err)
}
}
Expand Down
26 changes: 18 additions & 8 deletions cmd/metadb/libpq/libpq.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ func serve(conn net.Conn, backend *pgproto3.Backend, db *dbx.DB, sources *[]*sys
_ = err
return
}
log.Trace("*** %#v", msg)

switch m := msg.(type) {
case *pgproto3.Parse:
Expand Down Expand Up @@ -181,7 +180,6 @@ func processParse(conn net.Conn, backend *pgproto3.Backend, parse *pgproto3.Pars
if err != nil {
return fmt.Errorf("unexpected message in extended query: %w", err)
}
log.Trace("*** %#v", msg)

_ = stmt
switch m := msg.(type) {
Expand Down Expand Up @@ -409,11 +407,22 @@ func list(conn net.Conn, node *ast.ListStmt, dc *pgx.Conn, sources *[]*sysdb.Sou
" WHEN (tables='.*' AND dbupdated) THEN 'authorized'"+
" ELSE 'not authorized'"+
" END note"+
" FROM metadb.auth", nil, dc)
" FROM metadb.auth"+
" ORDER BY username", nil, dc)
case "data_mappings":
return proxySelect(conn, "SELECT 'json' AS mapping_type, schema_name||'.'||table_name AS table_name, column_name, path AS object_path, map AS target_identifier FROM metadb.transform_json", nil, dc)
return proxySelect(conn, ""+
"SELECT 'json' mapping_type,"+
" schema_name||'.'||table_name table_name,"+
" column_name,"+
" path object_path,"+
" map target_identifier"+
" FROM metadb.transform_json"+
" ORDER BY mapping_type, table_name, column_name, path", nil, dc)
case "data_origins":
return proxySelect(conn, "SELECT name FROM metadb.origin", nil, dc)
return proxySelect(conn, ""+
"SELECT name"+
" FROM metadb.origin"+
" ORDER BY name", nil, dc)
case "data_sources":
return proxySelect(conn, ""+
"SELECT name,"+
Expand All @@ -427,7 +436,8 @@ func list(conn net.Conn, node *ast.ListStmt, dc *pgx.Conn, sources *[]*sysdb.Sou
" trimschemaprefix,"+
" addschemaprefix,"+
" module"+
" FROM metadb.source", nil, dc)
" FROM metadb.source"+
" ORDER BY name", nil, dc)
case "status":
return listStatus(conn, sources)
default:
Expand Down Expand Up @@ -571,8 +581,8 @@ func createDataMapping(conn net.Conn, node *ast.CreateDataMappingStmt, dc *pgx.C
q := "INSERT INTO metadb.transform_json (schema_name, table_name, column_name, path, map) VALUES ($1, $2, $3, $4, $5)"
if _, err = dc.Exec(context.TODO(), q, table.Schema, table.Table, node.ColumnName, node.Path, node.TargetIdentifier); err != nil {
if strings.Contains(err.Error(), "duplicate key value violates unique constraint") {
return fmt.Errorf("mapping already exists from table %q, column %q, path %q",
node.TableName, node.ColumnName, node.Path)
return fmt.Errorf("JSON mapping from (table %q, column %q, path %q) to (%q) conflicts with an existing mapping",
node.TableName, node.ColumnName, node.Path, node.TargetIdentifier)
}
return errors.New(strings.TrimPrefix(err.Error(), "ERROR: "))
}
Expand Down
3 changes: 2 additions & 1 deletion cmd/metadb/upgrade/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -1699,7 +1699,8 @@ func updb25(opt *dbopt) error {
"column_name varchar(63) NOT NULL, " +
"path text NOT NULL, " +
"PRIMARY KEY (schema_name, table_name, column_name, path), " +
"map text NOT NULL)"
"map text NOT NULL, " +
"UNIQUE (schema_name, table_name, column_name, map))"
_, err = tx.Exec(context.TODO(), q)
if err != nil {
return fmt.Errorf("creating table metadb.transform_json: %w", err)
Expand Down
10 changes: 6 additions & 4 deletions doc/reference.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -490,9 +490,9 @@ of a mapping is written to a new column or table based on the specified target
identifier.

In JSON mapping, the specified path identifies a JSON object or array to
transform. For example, the path `'$.a.b'` means an object or array named `b`
contained within an object or array named `a`. The path `'$'` means the
outermost enclosing object. Note that an object or array will not be
transform. For example, the path `'$.a.b'` is used to refer to an object or
array named `b` contained within an object or array named `a`. The path `'$'`
means the outermost enclosing object. Note that an object or array will not be
transformed unless all of its parents are also transformed; for example, a
mapping from path `'$.a.b'` will only take effect if mappings are also defined
for both the paths `'$.a'` and `'$'` within the same table and column.
Expand Down Expand Up @@ -522,7 +522,9 @@ take effect.
|Path to a JSON object or array.

|`'*_target_identifier_*'`
|A short identifier to be used in naming the transformed data.
|A short identifier to be used in naming the transformed data. It must be
unique for the transformed object; in other words, no two paths should be
mapped to the same target identifier.
|===

[discrete]
Expand Down

0 comments on commit 9b6dcf2

Please sign in to comment.