Skip to content

Commit

Permalink
materialize-bigquery: bound merge query ranges on observed keys
Browse files Browse the repository at this point in the history
Implements the merge query optimization for BigQuery, where the minimum and
maximum observed values for key fields is included in the merge predicate.
  • Loading branch information
williamhbaker committed Jan 2, 2025
1 parent bde0694 commit 06228b2
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 38 deletions.
40 changes: 16 additions & 24 deletions materialize-bigquery/.snapshots/TestSQLGeneration
Original file line number Diff line number Diff line change
Expand Up @@ -59,30 +59,6 @@ INSERT INTO projectID.dataset.delta_updates (theKey, aValue, flow_published_at)
SELECT c0, c1, c2 FROM flow_temp_table_1;
--- End projectID.dataset.delta_updates storeInsert ---

--- Begin projectID.dataset.key_value storeUpdate ---
MERGE INTO projectID.dataset.key_value AS l
USING flow_temp_table_0 AS r
ON l.key1 = r.c0 AND l.key2 = r.c1 AND l.key_binary = r.c2
WHEN MATCHED AND r.c19='"delete"' THEN
DELETE
WHEN MATCHED THEN
UPDATE SET l.`array` = r.c3, l.binary = r.c4, l.boolean = r.c5, l.flow_published_at = r.c6, l.integer = r.c7, l.integerGt64Bit = r.c8, l.integerWithUserDDL = r.c9, l.multiple = r.c10, l.number = r.c11, l.numberCastToString = r.c12, l.object = r.c13, l.string = r.c14, l.stringInteger = r.c15, l.stringInteger39Chars = r.c16, l.stringInteger66Chars = r.c17, l.stringNumber = r.c18, l.flow_document = r.c19
WHEN NOT MATCHED THEN
INSERT (key1, key2, key_binary, `array`, binary, boolean, flow_published_at, integer, integerGt64Bit, integerWithUserDDL, multiple, number, numberCastToString, object, string, stringInteger, stringInteger39Chars, stringInteger66Chars, stringNumber, flow_document)
VALUES (r.c0, r.c1, r.c2, r.c3, r.c4, r.c5, r.c6, r.c7, r.c8, r.c9, r.c10, r.c11, r.c12, r.c13, r.c14, r.c15, r.c16, r.c17, r.c18, r.c19);
--- End projectID.dataset.key_value storeUpdate ---

--- Begin projectID.dataset.delta_updates storeUpdate ---
MERGE INTO projectID.dataset.delta_updates AS l
USING flow_temp_table_1 AS r
ON l.theKey = r.c0
WHEN MATCHED THEN
UPDATE SET l.aValue = r.c1, l.flow_published_at = r.c2
WHEN NOT MATCHED THEN
INSERT (theKey, aValue, flow_published_at)
VALUES (r.c0, r.c1, r.c2);
--- End projectID.dataset.delta_updates storeUpdate ---

--- Begin alter table add columns and drop not nulls ---
ALTER TABLE projectID.dataset.key_value
ADD COLUMN first_new_column STRING,
Expand Down Expand Up @@ -176,4 +152,20 @@ UPDATE path.`to`.checkpoints
AND fence=123;
--- End Fence Update ---

--- Begin projectID.dataset.key_value storeUpdate ---
MERGE INTO projectID.dataset.key_value AS l
USING flow_temp_table_0 AS r
ON
l.key1 = r.c0 AND l.key1 >= 10 AND l.key1 <= 100
AND l.key2 = r.c1
AND l.key_binary = r.c2 AND l.key_binary >= 'aGVsbG8K' AND l.key_binary <= 'Z29vZGJ5ZQo='
WHEN MATCHED AND r.c19='"delete"' THEN
DELETE
WHEN MATCHED THEN
UPDATE SET l.`array` = r.c3, l.binary = r.c4, l.boolean = r.c5, l.flow_published_at = r.c6, l.integer = r.c7, l.integerGt64Bit = r.c8, l.integerWithUserDDL = r.c9, l.multiple = r.c10, l.number = r.c11, l.numberCastToString = r.c12, l.object = r.c13, l.string = r.c14, l.stringInteger = r.c15, l.stringInteger39Chars = r.c16, l.stringInteger66Chars = r.c17, l.stringNumber = r.c18, l.flow_document = r.c19
WHEN NOT MATCHED THEN
INSERT (key1, key2, key_binary, `array`, binary, boolean, flow_published_at, integer, integerGt64Bit, integerWithUserDDL, multiple, number, numberCastToString, object, string, stringInteger, stringInteger39Chars, stringInteger66Chars, stringNumber, flow_document)
VALUES (r.c0, r.c1, r.c2, r.c3, r.c4, r.c5, r.c6, r.c7, r.c8, r.c9, r.c10, r.c11, r.c12, r.c13, r.c14, r.c15, r.c16, r.c17, r.c18, r.c19);
--- End projectID.dataset.key_value storeUpdate ---


2 changes: 1 addition & 1 deletion materialize-bigquery/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
v2
v3
2 changes: 1 addition & 1 deletion materialize-bigquery/binding.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ type binding struct {
target sql.Table
loadQuerySQL string
storeInsertSQL string
storeUpdateSQL string

loadFile *stagedFile
storeFile *stagedFile
tempTableName string
hasData bool
mustMerge bool
mergeBounds *sql.MergeBoundsBuilder
}

// bindingDocument is used by the load operation to fetch binding flow_document values
Expand Down
24 changes: 21 additions & 3 deletions materialize-bigquery/sqlgen.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,9 +228,10 @@ SELECT {{ range $ind, $col := $.Columns }}
{{ define "storeUpdate" -}}
MERGE INTO {{ $.Identifier }} AS l
USING {{ template "tempTableName" . }} AS r
ON {{ range $ind, $key := $.Keys }}
{{- if $ind }} AND {{end -}}
l.{{$key.Identifier}} = r.c{{$ind}}
ON {{ range $ind, $bound := $.Bounds }}
{{ if $ind -}} AND {{end -}}
l.{{$bound.Identifier}} = r.c{{$ind}}
{{- if $bound.LiteralLower }} AND l.{{ $bound.Identifier }} >= {{ $bound.LiteralLower }} AND l.{{ $bound.Identifier }} <= {{ $bound.LiteralUpper }}{{ end }}
{{- end}}
{{- if $.Document }}
WHEN MATCHED AND r.c{{ Add (len $.Columns) -1 }}='"delete"' THEN
Expand Down Expand Up @@ -338,3 +339,20 @@ UPDATE {{ Identifier $.TablePath }}
tplStoreInsert = tplAll.Lookup("storeInsert")
tplStoreUpdate = tplAll.Lookup("storeUpdate")
)

type mergeQueryInput struct {
sql.Table
Bounds []sql.MergeBound
}

func renderMergeQueryTemplate(table sql.Table, bounds []sql.MergeBound) (string, error) {
var w strings.Builder
if err := tplStoreUpdate.Execute(&w, &mergeQueryInput{
Table: table,
Bounds: bounds,
}); err != nil {
return "", err
}

return w.String(), nil
}
38 changes: 36 additions & 2 deletions materialize-bigquery/sqlgen_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ import (

"github.com/bradleyjkemp/cupaloy"
sql "github.com/estuary/connectors/materialize-sql"
"github.com/stretchr/testify/require"
)

func TestSQLGeneration(t *testing.T) {
snap, _ := sql.RunSqlGenTests(
snap, tables := sql.RunSqlGenTests(
t,
bqDialect,
func(table string, delta bool) sql.Resource {
Expand All @@ -25,7 +26,6 @@ func TestSQLGeneration(t *testing.T) {
tplCreateTargetTable,
tplLoadQuery,
tplStoreInsert,
tplStoreUpdate,
},
TplAddColumns: tplAlterTableColumns,
TplDropNotNulls: tplAlterTableColumns,
Expand All @@ -35,5 +35,39 @@ func TestSQLGeneration(t *testing.T) {
},
)

{
tpl := tplStoreUpdate
tbl := tables[0]
require.False(t, tbl.DeltaUpdates)
var testcase = tbl.Identifier + " " + tpl.Name()

bounds := []sql.MergeBound{
{
Identifier: tbl.Keys[0].Identifier,
LiteralLower: bqDialect.Literal(int64(10)),
LiteralUpper: bqDialect.Literal(int64(100)),
},
{
Identifier: tbl.Keys[1].Identifier,
// No bounds - as would be the case for a boolean key, which
// would be a very weird key, but technically allowed.
},
{
Identifier: tbl.Keys[2].Identifier,
LiteralLower: bqDialect.Literal("aGVsbG8K"),
LiteralUpper: bqDialect.Literal("Z29vZGJ5ZQo="),
},
}

tf := mergeQueryInput{
Table: tbl,
Bounds: bounds,
}

snap.WriteString("--- Begin " + testcase + " ---\n")
require.NoError(t, tpl.Execute(snap, &tf))
snap.WriteString("--- End " + testcase + " ---\n\n")
}

cupaloy.SnapshotT(t, snap.String())
}
21 changes: 14 additions & 7 deletions materialize-bigquery/transactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func newTransactor(
fieldSchemas[f.Name] = f
}

if err = t.addBinding(binding, fieldSchemas); err != nil {
if err = t.addBinding(binding, fieldSchemas, &ep.Dialect); err != nil {
return nil, nil, fmt.Errorf("addBinding of %s: %w", binding.Path, err)
}
}
Expand All @@ -103,7 +103,7 @@ func newTransactor(
return t, opts, nil
}

func (t *transactor) addBinding(target sql.Table, fieldSchemas map[string]*bigquery.FieldSchema) error {
func (t *transactor) addBinding(target sql.Table, fieldSchemas map[string]*bigquery.FieldSchema, dialect *sql.Dialect) error {
loadSchema, err := schemaForCols(target.KeyPtrs(), fieldSchemas)
if err != nil {
return err
Expand All @@ -115,9 +115,10 @@ func (t *transactor) addBinding(target sql.Table, fieldSchemas map[string]*bigqu
}

b := &binding{
target: target,
loadFile: newStagedFile(t.client.cloudStorageClient, t.bucket, t.bucketPath, loadSchema),
storeFile: newStagedFile(t.client.cloudStorageClient, t.bucket, t.bucketPath, storeSchema),
target: target,
loadFile: newStagedFile(t.client.cloudStorageClient, t.bucket, t.bucketPath, loadSchema),
storeFile: newStagedFile(t.client.cloudStorageClient, t.bucket, t.bucketPath, storeSchema),
mergeBounds: sql.NewMergeBoundsBuilder(target, dialect.Literal),
}

for _, m := range []struct {
Expand All @@ -127,7 +128,6 @@ func (t *transactor) addBinding(target sql.Table, fieldSchemas map[string]*bigqu
{&b.tempTableName, tplTempTableName},
{&b.loadQuerySQL, tplLoadQuery},
{&b.storeInsertSQL, tplStoreInsert},
{&b.storeUpdateSQL, tplStoreUpdate},
} {
var err error
if *m.sql, err = sql.RenderTableTemplate(target, m.tpl); err != nil {
Expand Down Expand Up @@ -297,6 +297,7 @@ func (t *transactor) Store(it *m.StoreIterator) (m.StartCommitFunc, error) {
} else if err = b.storeFile.encodeRow(ctx, converted); err != nil {
return nil, fmt.Errorf("encoding Store to scratch file: %w", err)
}
b.mergeBounds.NextStore(it.Key)
}
if it.Err() != nil {
return nil, it.Err()
Expand Down Expand Up @@ -358,7 +359,13 @@ func (t *transactor) commit(ctx context.Context, cleanupFiles []func(context.Con
if !b.mustMerge {
subqueries = append(subqueries, b.storeInsertSQL)
} else {
subqueries = append(subqueries, b.storeUpdateSQL)
if bounds, err := b.mergeBounds.Build(); err != nil {
return fmt.Errorf("building merge bounds: %w", err)
} else if mergeQuery, err := renderMergeQueryTemplate(b.target, bounds); err != nil {
return fmt.Errorf("rendering merge query template: %w", err)
} else {
subqueries = append(subqueries, mergeQuery)
}
}

// Reset for the next round.
Expand Down

0 comments on commit 06228b2

Please sign in to comment.