Skip to content

Commit

Permalink
materialize-snowflake: bound merge query ranges on observed keys
Browse files Browse the repository at this point in the history
Implements the merge query optimization for Snowflake, where the minimum and
maximum observed values for key fields is included in the merge predicate.
  • Loading branch information
williamhbaker committed Jan 2, 2025
1 parent 06228b2 commit 0f7dee5
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 40 deletions.
35 changes: 19 additions & 16 deletions materialize-snowflake/.snapshots/TestSQLGeneration
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,25 @@ ALTER TABLE "a-schema".key_value ALTER COLUMN
second_required_column DROP NOT NULL;
--- End alter table drop not nulls ---

--- Begin "a-schema".key_value mergeInto ---
MERGE INTO "a-schema".key_value AS l
USING (
SELECT $1[0] AS key1, $1[1] AS key2, $1[2] AS "key!binary", $1[3] AS array, $1[4] AS binary, $1[5] AS boolean, $1[6] AS flow_published_at, $1[7] AS integer, $1[8] AS integerGt64Bit, $1[9] AS integerWithUserDDL, $1[10] AS multiple, $1[11] AS number, $1[12] AS numberCastToString, $1[13] AS object, $1[14] AS string, $1[15] AS stringInteger, $1[16] AS stringInteger39Chars, $1[17] AS stringInteger66Chars, $1[18] AS stringNumber, $1[19] AS flow_document
FROM test-file
) AS r
ON
l.key1 = r.key1 AND l.key1 >= 10 AND l.key1 <= 100
AND l.key2 = r.key2
AND l."key!binary" = r."key!binary" AND l."key!binary" >= 'aGVsbG8K' AND l."key!binary" <= 'Z29vZGJ5ZQo='
WHEN MATCHED AND r.flow_document='delete' THEN
DELETE
WHEN MATCHED THEN
UPDATE SET l.array = r.array, l.binary = r.binary, l.boolean = r.boolean, l.flow_published_at = r.flow_published_at, l.integer = r.integer, l.integerGt64Bit = r.integerGt64Bit, l.integerWithUserDDL = r.integerWithUserDDL, l.multiple = r.multiple, l.number = r.number, l.numberCastToString = r.numberCastToString, l.object = r.object, l.string = r.string, l.stringInteger = r.stringInteger, l.stringInteger39Chars = r.stringInteger39Chars, l.stringInteger66Chars = r.stringInteger66Chars, l.stringNumber = r.stringNumber, l.flow_document = r.flow_document
WHEN NOT MATCHED and r.flow_document!='delete' THEN
INSERT (key1, key2, "key!binary", array, binary, boolean, flow_published_at, integer, integerGt64Bit, integerWithUserDDL, multiple, number, numberCastToString, object, string, stringInteger, stringInteger39Chars, stringInteger66Chars, stringNumber, flow_document)
VALUES (r.key1, r.key2, r."key!binary", r.array, r.binary, r.boolean, r.flow_published_at, r.integer, r.integerGt64Bit, r.integerWithUserDDL, r.multiple, r.number, r.numberCastToString, r.object, r.string, r.stringInteger, r.stringInteger39Chars, r.stringInteger66Chars, r.stringNumber, r.flow_document);
--- End "a-schema".key_value mergeInto ---

--- Begin "a-schema".key_value loadQuery ---
SELECT 0, "a-schema".key_value.flow_document
FROM "a-schema".key_value
Expand All @@ -111,22 +130,6 @@ COPY INTO "a-schema".key_value (
);
--- End "a-schema".key_value copyInto ---

--- Begin "a-schema".key_value mergeInto ---
MERGE INTO "a-schema".key_value AS l
USING (
SELECT $1[0] AS key1, $1[1] AS key2, $1[2] AS "key!binary", $1[3] AS array, $1[4] AS binary, $1[5] AS boolean, $1[6] AS flow_published_at, $1[7] AS integer, $1[8] AS integerGt64Bit, $1[9] AS integerWithUserDDL, $1[10] AS multiple, $1[11] AS number, $1[12] AS numberCastToString, $1[13] AS object, $1[14] AS string, $1[15] AS stringInteger, $1[16] AS stringInteger39Chars, $1[17] AS stringInteger66Chars, $1[18] AS stringNumber, $1[19] AS flow_document
FROM test-file
) AS r
ON l.key1 = r.key1 AND l.key2 = r.key2 AND l."key!binary" = r."key!binary"
WHEN MATCHED AND r.flow_document='delete' THEN
DELETE
WHEN MATCHED THEN
UPDATE SET l.array = r.array, l.binary = r.binary, l.boolean = r.boolean, l.flow_published_at = r.flow_published_at, l.integer = r.integer, l.integerGt64Bit = r.integerGt64Bit, l.integerWithUserDDL = r.integerWithUserDDL, l.multiple = r.multiple, l.number = r.number, l.numberCastToString = r.numberCastToString, l.object = r.object, l.string = r.string, l.stringInteger = r.stringInteger, l.stringInteger39Chars = r.stringInteger39Chars, l.stringInteger66Chars = r.stringInteger66Chars, l.stringNumber = r.stringNumber, l.flow_document = r.flow_document
WHEN NOT MATCHED and r.flow_document!='delete' THEN
INSERT (key1, key2, "key!binary", array, binary, boolean, flow_published_at, integer, integerGt64Bit, integerWithUserDDL, multiple, number, numberCastToString, object, string, stringInteger, stringInteger39Chars, stringInteger66Chars, stringNumber, flow_document)
VALUES (r.key1, r.key2, r."key!binary", r.array, r.binary, r.boolean, r.flow_published_at, r.integer, r.integerGt64Bit, r.integerWithUserDDL, r.multiple, r.number, r.numberCastToString, r.object, r.string, r.stringInteger, r.stringInteger39Chars, r.stringInteger66Chars, r.stringNumber, r.flow_document);
--- End "a-schema".key_value mergeInto ---

--- Begin "a-schema".delta_updates copyInto ---
COPY INTO "a-schema".delta_updates (
theKey, aValue, flow_published_at
Expand Down
2 changes: 1 addition & 1 deletion materialize-snowflake/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
v3
v4
25 changes: 15 additions & 10 deletions materialize-snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,10 +293,11 @@ type binding struct {
}
// Variables accessed by Prepare, Store, and Commit.
store struct {
stage *stagedFile
mergeInto string
copyInto string
mustMerge bool
stage *stagedFile
mergeInto string
copyInto string
mustMerge bool
mergeBounds *sql.MergeBoundsBuilder
}
}

Expand All @@ -306,11 +307,12 @@ func (d *transactor) addBinding(target sql.Table) error {

b.load.stage = newStagedFile(os.TempDir())
b.store.stage = newStagedFile(os.TempDir())
b.store.mergeBounds = sql.NewMergeBoundsBuilder(target, d.ep.Dialect.Literal)

if b.target.DeltaUpdates && d.cfg.Credentials.AuthType == JWT {
var pipeName string
var err error
if pipeName, err = RenderTableShardVersionTemplate(b.target, d._range.KeyBegin, d.version, d.templates.pipeName); err != nil {
if pipeName, err = renderTableShardVersionTemplate(b.target, d._range.KeyBegin, d.version, d.templates.pipeName); err != nil {
return fmt.Errorf("pipeName template: %w", err)
} else {
pipeName = strings.ToUpper(strings.Trim(pipeName, "`"))
Expand Down Expand Up @@ -349,7 +351,7 @@ func (d *transactor) Load(it *m.LoadIterator, loaded func(int, json.RawMessage)
// Pass.
} else if dir, err := b.load.stage.flush(); err != nil {
return fmt.Errorf("load.stage(): %w", err)
} else if subqueries[i], err = RenderTableAndFileTemplate(b.target, dir, d.templates.loadQuery); err != nil {
} else if subqueries[i], err = renderTableAndFileTemplate(b.target, dir, d.templates.loadQuery); err != nil {
return fmt.Errorf("loadQuery template: %w", err)
} else {
filesToCleanup = append(filesToCleanup, dir)
Expand Down Expand Up @@ -465,6 +467,7 @@ func (d *transactor) Store(it *m.StoreIterator) (m.StartCommitFunc, error) {
} else if err = b.store.stage.encodeRow(converted); err != nil {
return nil, fmt.Errorf("encoding Store to scratch file: %w", err)
}
b.store.mergeBounds.NextStore(it.Key)
}

// Upload the staged files and build a list of merge and copy into queries that need to be run
Expand All @@ -485,7 +488,9 @@ func (d *transactor) Store(it *m.StoreIterator) (m.StartCommitFunc, error) {
}

if b.store.mustMerge {
if mergeIntoQuery, err := RenderTableAndFileTemplate(b.target, dir, d.templates.mergeInto); err != nil {
if bounds, err := b.store.mergeBounds.Build(); err != nil {
return nil, fmt.Errorf("building merge bounds: %w", err)
} else if mergeIntoQuery, err := renderMergeQueryTemplate(d.templates.mergeInto, b.target, dir, bounds); err != nil {
return nil, fmt.Errorf("mergeInto template: %w", err)
} else {
d.cp[b.target.StateKey] = &checkpointItem{
Expand All @@ -511,7 +516,7 @@ func (d *transactor) Store(it *m.StoreIterator) (m.StartCommitFunc, error) {
// it means if the spec has been updated, we will end up creating a new pipe
if !exists {
log.WithField("name", b.pipeName).Info("store: creating pipe")
if createPipe, err := RenderTableShardVersionTemplate(b.target, d._range.KeyBegin, d.version, d.templates.createPipe); err != nil {
if createPipe, err := renderTableShardVersionTemplate(b.target, d._range.KeyBegin, d.version, d.templates.createPipe); err != nil {
return nil, fmt.Errorf("createPipe template: %w", err)
} else if _, err := d.db.ExecContext(ctx, createPipe); err != nil {
return nil, fmt.Errorf("creating pipe for table %q: %w", b.target.Path, err)
Expand Down Expand Up @@ -539,7 +544,7 @@ func (d *transactor) Store(it *m.StoreIterator) (m.StartCommitFunc, error) {
}

} else {
if copyIntoQuery, err := RenderTableAndFileTemplate(b.target, dir, d.templates.copyInto); err != nil {
if copyIntoQuery, err := renderTableAndFileTemplate(b.target, dir, d.templates.copyInto); err != nil {
return nil, fmt.Errorf("copyInto template: %w", err)
} else {
d.cp[b.target.StateKey] = &checkpointItem{
Expand Down Expand Up @@ -586,7 +591,7 @@ type copyHistoryRow struct {
}

func (d *transactor) copyHistory(ctx context.Context, tableName string, fileNames []string) ([]copyHistoryRow, error) {
query, err := RenderCopyHistoryTemplate(tableName, fileNames, d.templates.copyHistory)
query, err := renderCopyHistoryTemplate(tableName, fileNames, d.templates.copyHistory)
if err != nil {
return nil, fmt.Errorf("snowpipe: rendering copy history: %w", err)
}
Expand Down
32 changes: 26 additions & 6 deletions materialize-snowflake/sqlgen.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,9 +280,10 @@ USING (
{{- end }}
FROM {{ $.File }}
) AS r
ON {{ range $ind, $key := $.Table.Keys }}
{{- if $ind }} AND {{ end -}}
l.{{ $key.Identifier }} = r.{{ $key.Identifier }}
ON {{ range $ind, $bound := $.Bounds }}
{{ if $ind -}} AND {{ end -}}
l.{{ $bound.Identifier }} = r.{{ $bound.Identifier }}
{{- if $bound.LiteralLower }} AND l.{{ $bound.Identifier }} >= {{ $bound.LiteralLower }} AND l.{{ $bound.Identifier }} <= {{ $bound.LiteralUpper }}{{ end }}
{{- end }}
{{- if $.Table.Document }}
WHEN MATCHED AND r.{{ $.Table.Document.Identifier }}='delete' THEN
Expand Down Expand Up @@ -347,7 +348,7 @@ type tableShardVersion struct {
Version string
}

func RenderTableShardVersionTemplate(table sql.Table, shardKeyBegin uint32, version string, tpl *template.Template) (string, error) {
func renderTableShardVersionTemplate(table sql.Table, shardKeyBegin uint32, version string, tpl *template.Template) (string, error) {
var w strings.Builder
var keyBegin = fmt.Sprintf("%08x", shardKeyBegin)
if err := tpl.Execute(&w, &tableShardVersion{Table: table, ShardKeyBegin: keyBegin, Version: version}); err != nil {
Expand All @@ -368,7 +369,7 @@ type tableAndFile struct {
File string
}

func RenderTableAndFileTemplate(table sql.Table, file string, tpl *template.Template) (string, error) {
func renderTableAndFileTemplate(table sql.Table, file string, tpl *template.Template) (string, error) {
var w strings.Builder
if err := tpl.Execute(&w, &tableAndFile{Table: table, File: file}); err != nil {
return "", err
Expand All @@ -387,7 +388,7 @@ type copyHistory struct {
Files []string
}

func RenderCopyHistoryTemplate(tableName string, files []string, tpl *template.Template) (string, error) {
func renderCopyHistoryTemplate(tableName string, files []string, tpl *template.Template) (string, error) {
var w strings.Builder
if err := tpl.Execute(&w, &copyHistory{TableName: tableName, Files: files}); err != nil {
return "", err
Expand All @@ -400,3 +401,22 @@ func RenderCopyHistoryTemplate(tableName string, files []string, tpl *template.T
}).Debug("rendered template")
return s, nil
}

type mergeQueryInput struct {
Table sql.Table
File string
Bounds []sql.MergeBound
}

func renderMergeQueryTemplate(tpl *template.Template, table sql.Table, file string, bounds []sql.MergeBound) (string, error) {
var w strings.Builder
if err := tpl.Execute(&w, &mergeQueryInput{
Table: table,
File: file,
Bounds: bounds,
}); err != nil {
return "", err
}

return w.String(), nil
}
36 changes: 35 additions & 1 deletion materialize-snowflake/sqlgen_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,44 @@ func TestSQLGeneration(t *testing.T) {
},
)

{
tpl := templates.mergeInto
tbl := tables[0]
require.False(t, tbl.DeltaUpdates)
var testcase = tbl.Identifier + " " + tpl.Name()

bounds := []sql.MergeBound{
{
Identifier: tbl.Keys[0].Identifier,
LiteralLower: testDialect.Literal(int64(10)),
LiteralUpper: testDialect.Literal(int64(100)),
},
{
Identifier: tbl.Keys[1].Identifier,
// No bounds - as would be the case for a boolean key, which
// would be a very weird key, but technically allowed.
},
{
Identifier: tbl.Keys[2].Identifier,
LiteralLower: testDialect.Literal("aGVsbG8K"),
LiteralUpper: testDialect.Literal("Z29vZGJ5ZQo="),
},
}

tf := mergeQueryInput{
Table: tbl,
File: "test-file",
Bounds: bounds,
}

snap.WriteString("--- Begin " + testcase + " ---")
require.NoError(t, tpl.Execute(snap, &tf))
snap.WriteString("--- End " + testcase + " ---\n\n")
}

for _, tpl := range []*template.Template{
templates.loadQuery,
templates.copyInto,
templates.mergeInto,
} {
tbl := tables[0]
require.False(t, tbl.DeltaUpdates)
Expand Down
Loading

0 comments on commit 0f7dee5

Please sign in to comment.