From 51cfe56c90408ab6368aec6dbfd733b5573bc3bf Mon Sep 17 00:00:00 2001 From: Charitha Bandi <45089429+charithabandi@users.noreply.github.com> Date: Mon, 16 Sep 2024 15:57:26 -0500 Subject: [PATCH] Fixes the pg_dump issues with create functions (#980) * Fixes the pg_dump issues with create functions * Fail fast and warn users if statesync is enabled on the already initialized db --- cmd/kwil-admin/cmds/snapshot/create.go | 20 +++++++++------ internal/statesync/snapshotter.go | 34 ++++++++++++++++++-------- 2 files changed, 36 insertions(+), 18 deletions(-) diff --git a/cmd/kwil-admin/cmds/snapshot/create.go b/cmd/kwil-admin/cmds/snapshot/create.go index 321912b4b..bebef9e8f 100644 --- a/cmd/kwil-admin/cmds/snapshot/create.go +++ b/cmd/kwil-admin/cmds/snapshot/create.go @@ -184,7 +184,7 @@ func pgDump(ctx context.Context, dbName, dbUser, dbPass, dbHost, dbPort string, defer pgDumpOutput.Close() hasher := sha256.New() - var inVotersBlock bool + var inVotersBlock, schemaStarted bool var validatorCount int64 genCfg := chain.DefaultGenesisConfig() genCfg.Alloc = make(map[string]*big.Int) @@ -198,6 +198,7 @@ func pgDump(ctx context.Context, dbName, dbUser, dbPass, dbHost, dbPort string, for scanner.Scan() { line := scanner.Text() + trimLine := strings.TrimSpace(line) // Remove whitespaces, set and select statements, process voters table if inVotersBlock { @@ -234,17 +235,20 @@ func pgDump(ctx context.Context, dbName, dbUser, dbPass, dbHost, dbPort string, }) validatorCount++ } else { - if line == "" || strings.TrimSpace(line) == "" { // Skip empty lines + if line == "" || trimLine == "" { // Skip empty lines continue - } else if strings.HasPrefix(line, "--") { // Skip comments + } else if strings.HasPrefix(trimLine, "--") { // Skip comments continue - } else if strings.HasPrefix(line, "SET") || strings.HasPrefix(line, "SELECT") || strings.HasPrefix(line[1:], "connect") { - // Skip SET and SELECT and connect statements + } else if !schemaStarted && (strings.HasPrefix(trimLine, "SET") || strings.HasPrefix(trimLine, "SELECT") || strings.HasPrefix(trimLine, "\\connect") || strings.HasPrefix(trimLine, "CREATE DATABASE")) { + // Skip SET and SELECT and connect and create database statements continue - } else if strings.HasPrefix(line, `CREATE DATABASE `) { - // Skip CREATE DATABASE statement } else { - if strings.HasPrefix(line, "COPY kwild_voting.voters") && strings.Contains(line, "FROM stdin;") { + // Start of schema + if !schemaStarted && (strings.HasPrefix(trimLine, "CREATE SCHEMA") || strings.HasPrefix(trimLine, "CREATE TABLE") || strings.HasPrefix(trimLine, "CREATE FUNCTION")) { + schemaStarted = true + } + + if strings.HasPrefix(trimLine, "COPY kwild_voting.voters") && strings.Contains(trimLine, "FROM stdin;") { inVotersBlock = true } diff --git a/internal/statesync/snapshotter.go b/internal/statesync/snapshotter.go index 909df5189..03163f628 100644 --- a/internal/statesync/snapshotter.go +++ b/internal/statesync/snapshotter.go @@ -26,6 +26,10 @@ const ( stage1output = "stage1output.sql" stage2output = "stage2output.sql" stage3output = "stage3output.sql.gz" + + CreateSchema = "CREATE SCHEMA" + CreateTable = "CREATE TABLE" + CreateFunction = "CREATE FUNCTION" ) // This file deals with creating a snapshot instance at a given snapshotID @@ -209,7 +213,7 @@ func (s *Snapshotter) sanitizeDump(height uint64, format uint32) ([]byte, error) scanner := bufio.NewScanner(dumpInst1) scanner.Buffer(buf, s.maxRowSize) - var inCopyBlock bool + var inCopyBlock, schemaStarted bool var lineHashes []hashedLine var offset int64 hasher := sha256.New() @@ -217,6 +221,7 @@ func (s *Snapshotter) sanitizeDump(height uint64, format uint32) ([]byte, error) for scanner.Scan() { line := scanner.Text() numBytes := int64(len(line)) + 1 // +1 for newline character + trimLine := strings.TrimSpace(line) if inCopyBlock { /* @@ -226,7 +231,7 @@ func (s *Snapshotter) sanitizeDump(height uint64, format uint32) ([]byte, error) 3 entry3 \. */ - if line == "\\." { // end of COPY block + if trimLine == "\\." { // end of COPY block inCopyBlock = false // Inline sort the lineHashes array based on the row hash @@ -278,21 +283,30 @@ func (s *Snapshotter) sanitizeDump(height uint64, format uint32) ([]byte, error) offset += numBytes } } else { - offset += int64(len(line)) + 1 // +1 for newline character - if line == "" || strings.TrimSpace(line) == "" { + offset += numBytes // +1 for newline character + if line == "" || trimLine == "" { // skip empty lines continue - } else if strings.HasPrefix(line, "--") { + } else if strings.HasPrefix(trimLine, "--") { // skip comments continue - } else if strings.HasPrefix(line, "SET") || strings.HasPrefix(line, "SELECT") || - (len(line) > 1 && strings.HasPrefix(line[1:], "connect")) || - strings.HasPrefix(line, "CREATE DATABASE") { - // skip SET, SELECT, CREATE DATABASE and connect statements + } else if !schemaStarted && (strings.HasPrefix(trimLine, CreateSchema) || + strings.HasPrefix(trimLine, CreateTable) || strings.HasPrefix(trimLine, CreateFunction)) { + schemaStarted = true + + // write the line to the output file + _, err := outputFile.WriteString(line + "\n") + if err != nil { + return nil, fmt.Errorf("failed to write to sanitized dump file: %w", err) + } + } else if !schemaStarted && (strings.HasPrefix(trimLine, "SET") || strings.HasPrefix(trimLine, "SELECT") || + strings.HasPrefix(trimLine, "\\connect") || strings.HasPrefix(trimLine, "CREATE DATABASE")) { + // skip any SET, SELECT, CREATE DATABASE and connect statements that appear before the schema definition + // These are postgres specific commands that should not be included in the snapshot continue } else { // Example: COPY kwild_voting.voters (id, name, power) FROM stdin; - if strings.HasPrefix(line, "COPY") && strings.Contains(line, "FROM stdin;") { + if strings.HasPrefix(trimLine, "COPY") && strings.Contains(trimLine, "FROM stdin;") { inCopyBlock = true // start of COPY block } // write the line to the output file