diff --git a/materialize-bigquery/.snapshots/TestSQLGeneration b/materialize-bigquery/.snapshots/TestSQLGeneration index e0ff6ae23a..bdf615754b 100644 --- a/materialize-bigquery/.snapshots/TestSQLGeneration +++ b/materialize-bigquery/.snapshots/TestSQLGeneration @@ -59,30 +59,6 @@ INSERT INTO projectID.dataset.delta_updates (theKey, aValue, flow_published_at) SELECT c0, c1, c2 FROM flow_temp_table_1; --- End projectID.dataset.delta_updates storeInsert --- ---- Begin projectID.dataset.key_value storeUpdate --- -MERGE INTO projectID.dataset.key_value AS l -USING flow_temp_table_0 AS r -ON l.key1 = r.c0 AND l.key2 = r.c1 AND l.key_binary = r.c2 -WHEN MATCHED AND r.c19='"delete"' THEN - DELETE -WHEN MATCHED THEN - UPDATE SET l.`array` = r.c3, l.binary = r.c4, l.boolean = r.c5, l.flow_published_at = r.c6, l.integer = r.c7, l.integerGt64Bit = r.c8, l.integerWithUserDDL = r.c9, l.multiple = r.c10, l.number = r.c11, l.numberCastToString = r.c12, l.object = r.c13, l.string = r.c14, l.stringInteger = r.c15, l.stringInteger39Chars = r.c16, l.stringInteger66Chars = r.c17, l.stringNumber = r.c18, l.flow_document = r.c19 -WHEN NOT MATCHED THEN - INSERT (key1, key2, key_binary, `array`, binary, boolean, flow_published_at, integer, integerGt64Bit, integerWithUserDDL, multiple, number, numberCastToString, object, string, stringInteger, stringInteger39Chars, stringInteger66Chars, stringNumber, flow_document) - VALUES (r.c0, r.c1, r.c2, r.c3, r.c4, r.c5, r.c6, r.c7, r.c8, r.c9, r.c10, r.c11, r.c12, r.c13, r.c14, r.c15, r.c16, r.c17, r.c18, r.c19); ---- End projectID.dataset.key_value storeUpdate --- - ---- Begin projectID.dataset.delta_updates storeUpdate --- -MERGE INTO projectID.dataset.delta_updates AS l -USING flow_temp_table_1 AS r -ON l.theKey = r.c0 -WHEN MATCHED THEN - UPDATE SET l.aValue = r.c1, l.flow_published_at = r.c2 -WHEN NOT MATCHED THEN - INSERT (theKey, aValue, flow_published_at) - VALUES (r.c0, r.c1, r.c2); ---- End projectID.dataset.delta_updates storeUpdate --- - --- Begin alter table add columns and drop not nulls --- ALTER TABLE projectID.dataset.key_value ADD COLUMN first_new_column STRING, @@ -176,4 +152,20 @@ UPDATE path.`to`.checkpoints AND fence=123; --- End Fence Update --- +--- Begin projectID.dataset.key_value storeUpdate --- +MERGE INTO projectID.dataset.key_value AS l +USING flow_temp_table_0 AS r +ON + l.key1 = r.c0 AND l.key1 >= 10 AND l.key1 <= 100 + AND l.key2 = r.c1 + AND l.key_binary = r.c2 AND l.key_binary >= 'aGVsbG8K' AND l.key_binary <= 'Z29vZGJ5ZQo=' +WHEN MATCHED AND r.c19='"delete"' THEN + DELETE +WHEN MATCHED THEN + UPDATE SET l.`array` = r.c3, l.binary = r.c4, l.boolean = r.c5, l.flow_published_at = r.c6, l.integer = r.c7, l.integerGt64Bit = r.c8, l.integerWithUserDDL = r.c9, l.multiple = r.c10, l.number = r.c11, l.numberCastToString = r.c12, l.object = r.c13, l.string = r.c14, l.stringInteger = r.c15, l.stringInteger39Chars = r.c16, l.stringInteger66Chars = r.c17, l.stringNumber = r.c18, l.flow_document = r.c19 +WHEN NOT MATCHED THEN + INSERT (key1, key2, key_binary, `array`, binary, boolean, flow_published_at, integer, integerGt64Bit, integerWithUserDDL, multiple, number, numberCastToString, object, string, stringInteger, stringInteger39Chars, stringInteger66Chars, stringNumber, flow_document) + VALUES (r.c0, r.c1, r.c2, r.c3, r.c4, r.c5, r.c6, r.c7, r.c8, r.c9, r.c10, r.c11, r.c12, r.c13, r.c14, r.c15, r.c16, r.c17, r.c18, r.c19); +--- End projectID.dataset.key_value storeUpdate --- + diff --git a/materialize-bigquery/VERSION b/materialize-bigquery/VERSION index 8c1384d825..29ef827e8a 100644 --- a/materialize-bigquery/VERSION +++ b/materialize-bigquery/VERSION @@ -1 +1 @@ -v2 +v3 diff --git a/materialize-bigquery/binding.go b/materialize-bigquery/binding.go index bbc200c6e6..4e97e6187c 100644 --- a/materialize-bigquery/binding.go +++ b/materialize-bigquery/binding.go @@ -12,13 +12,13 @@ type binding struct { target sql.Table loadQuerySQL string storeInsertSQL string - storeUpdateSQL string loadFile *stagedFile storeFile *stagedFile tempTableName string hasData bool mustMerge bool + mergeBounds *sql.MergeBoundsBuilder } // bindingDocument is used by the load operation to fetch binding flow_document values diff --git a/materialize-bigquery/sqlgen.go b/materialize-bigquery/sqlgen.go index 861630cbc1..8e35babb95 100644 --- a/materialize-bigquery/sqlgen.go +++ b/materialize-bigquery/sqlgen.go @@ -228,9 +228,10 @@ SELECT {{ range $ind, $col := $.Columns }} {{ define "storeUpdate" -}} MERGE INTO {{ $.Identifier }} AS l USING {{ template "tempTableName" . }} AS r -ON {{ range $ind, $key := $.Keys }} -{{- if $ind }} AND {{end -}} - l.{{$key.Identifier}} = r.c{{$ind}} +ON {{ range $ind, $bound := $.Bounds }} + {{ if $ind -}} AND {{end -}} + l.{{$bound.Identifier}} = r.c{{$ind}} + {{- if $bound.LiteralLower }} AND l.{{ $bound.Identifier }} >= {{ $bound.LiteralLower }} AND l.{{ $bound.Identifier }} <= {{ $bound.LiteralUpper }}{{ end }} {{- end}} {{- if $.Document }} WHEN MATCHED AND r.c{{ Add (len $.Columns) -1 }}='"delete"' THEN @@ -338,3 +339,20 @@ UPDATE {{ Identifier $.TablePath }} tplStoreInsert = tplAll.Lookup("storeInsert") tplStoreUpdate = tplAll.Lookup("storeUpdate") ) + +type mergeQueryInput struct { + sql.Table + Bounds []sql.MergeBound +} + +func renderMergeQueryTemplate(table sql.Table, bounds []sql.MergeBound) (string, error) { + var w strings.Builder + if err := tplStoreUpdate.Execute(&w, &mergeQueryInput{ + Table: table, + Bounds: bounds, + }); err != nil { + return "", err + } + + return w.String(), nil +} diff --git a/materialize-bigquery/sqlgen_test.go b/materialize-bigquery/sqlgen_test.go index 113c10c3f0..7996d7b743 100644 --- a/materialize-bigquery/sqlgen_test.go +++ b/materialize-bigquery/sqlgen_test.go @@ -6,10 +6,11 @@ import ( "github.com/bradleyjkemp/cupaloy" sql "github.com/estuary/connectors/materialize-sql" + "github.com/stretchr/testify/require" ) func TestSQLGeneration(t *testing.T) { - snap, _ := sql.RunSqlGenTests( + snap, tables := sql.RunSqlGenTests( t, bqDialect, func(table string, delta bool) sql.Resource { @@ -25,7 +26,6 @@ func TestSQLGeneration(t *testing.T) { tplCreateTargetTable, tplLoadQuery, tplStoreInsert, - tplStoreUpdate, }, TplAddColumns: tplAlterTableColumns, TplDropNotNulls: tplAlterTableColumns, @@ -35,5 +35,39 @@ func TestSQLGeneration(t *testing.T) { }, ) + { + tpl := tplStoreUpdate + tbl := tables[0] + require.False(t, tbl.DeltaUpdates) + var testcase = tbl.Identifier + " " + tpl.Name() + + bounds := []sql.MergeBound{ + { + Identifier: tbl.Keys[0].Identifier, + LiteralLower: bqDialect.Literal(int64(10)), + LiteralUpper: bqDialect.Literal(int64(100)), + }, + { + Identifier: tbl.Keys[1].Identifier, + // No bounds - as would be the case for a boolean key, which + // would be a very weird key, but technically allowed. + }, + { + Identifier: tbl.Keys[2].Identifier, + LiteralLower: bqDialect.Literal("aGVsbG8K"), + LiteralUpper: bqDialect.Literal("Z29vZGJ5ZQo="), + }, + } + + tf := mergeQueryInput{ + Table: tbl, + Bounds: bounds, + } + + snap.WriteString("--- Begin " + testcase + " ---\n") + require.NoError(t, tpl.Execute(snap, &tf)) + snap.WriteString("--- End " + testcase + " ---\n\n") + } + cupaloy.SnapshotT(t, snap.String()) } diff --git a/materialize-bigquery/transactor.go b/materialize-bigquery/transactor.go index 888cdf22fd..120546a682 100644 --- a/materialize-bigquery/transactor.go +++ b/materialize-bigquery/transactor.go @@ -86,7 +86,7 @@ func newTransactor( fieldSchemas[f.Name] = f } - if err = t.addBinding(binding, fieldSchemas); err != nil { + if err = t.addBinding(binding, fieldSchemas, &ep.Dialect); err != nil { return nil, nil, fmt.Errorf("addBinding of %s: %w", binding.Path, err) } } @@ -103,7 +103,7 @@ func newTransactor( return t, opts, nil } -func (t *transactor) addBinding(target sql.Table, fieldSchemas map[string]*bigquery.FieldSchema) error { +func (t *transactor) addBinding(target sql.Table, fieldSchemas map[string]*bigquery.FieldSchema, dialect *sql.Dialect) error { loadSchema, err := schemaForCols(target.KeyPtrs(), fieldSchemas) if err != nil { return err @@ -115,9 +115,10 @@ func (t *transactor) addBinding(target sql.Table, fieldSchemas map[string]*bigqu } b := &binding{ - target: target, - loadFile: newStagedFile(t.client.cloudStorageClient, t.bucket, t.bucketPath, loadSchema), - storeFile: newStagedFile(t.client.cloudStorageClient, t.bucket, t.bucketPath, storeSchema), + target: target, + loadFile: newStagedFile(t.client.cloudStorageClient, t.bucket, t.bucketPath, loadSchema), + storeFile: newStagedFile(t.client.cloudStorageClient, t.bucket, t.bucketPath, storeSchema), + mergeBounds: sql.NewMergeBoundsBuilder(target, dialect.Literal), } for _, m := range []struct { @@ -127,7 +128,6 @@ func (t *transactor) addBinding(target sql.Table, fieldSchemas map[string]*bigqu {&b.tempTableName, tplTempTableName}, {&b.loadQuerySQL, tplLoadQuery}, {&b.storeInsertSQL, tplStoreInsert}, - {&b.storeUpdateSQL, tplStoreUpdate}, } { var err error if *m.sql, err = sql.RenderTableTemplate(target, m.tpl); err != nil { @@ -297,6 +297,7 @@ func (t *transactor) Store(it *m.StoreIterator) (m.StartCommitFunc, error) { } else if err = b.storeFile.encodeRow(ctx, converted); err != nil { return nil, fmt.Errorf("encoding Store to scratch file: %w", err) } + b.mergeBounds.NextStore(it.Key) } if it.Err() != nil { return nil, it.Err() @@ -358,7 +359,13 @@ func (t *transactor) commit(ctx context.Context, cleanupFiles []func(context.Con if !b.mustMerge { subqueries = append(subqueries, b.storeInsertSQL) } else { - subqueries = append(subqueries, b.storeUpdateSQL) + if bounds, err := b.mergeBounds.Build(); err != nil { + return fmt.Errorf("building merge bounds: %w", err) + } else if mergeQuery, err := renderMergeQueryTemplate(b.target, bounds); err != nil { + return fmt.Errorf("rendering merge query template: %w", err) + } else { + subqueries = append(subqueries, mergeQuery) + } } // Reset for the next round.