Skip to content

Commit

Permalink
Fixes the pg_dump issues with create functions (#980)
Browse files Browse the repository at this point in the history
* Fixes the pg_dump issues with create functions

* Fail fast and warn users if statesync is enabled on the already initialized db
  • Loading branch information
charithabandi authored Sep 16, 2024
1 parent 7d5bc36 commit 51cfe56
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 18 deletions.
20 changes: 12 additions & 8 deletions cmd/kwil-admin/cmds/snapshot/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func pgDump(ctx context.Context, dbName, dbUser, dbPass, dbHost, dbPort string,
defer pgDumpOutput.Close()

hasher := sha256.New()
var inVotersBlock bool
var inVotersBlock, schemaStarted bool
var validatorCount int64
genCfg := chain.DefaultGenesisConfig()
genCfg.Alloc = make(map[string]*big.Int)
Expand All @@ -198,6 +198,7 @@ func pgDump(ctx context.Context, dbName, dbUser, dbPass, dbHost, dbPort string,

for scanner.Scan() {
line := scanner.Text()
trimLine := strings.TrimSpace(line)

// Remove whitespaces, set and select statements, process voters table
if inVotersBlock {
Expand Down Expand Up @@ -234,17 +235,20 @@ func pgDump(ctx context.Context, dbName, dbUser, dbPass, dbHost, dbPort string,
})
validatorCount++
} else {
if line == "" || strings.TrimSpace(line) == "" { // Skip empty lines
if line == "" || trimLine == "" { // Skip empty lines
continue
} else if strings.HasPrefix(line, "--") { // Skip comments
} else if strings.HasPrefix(trimLine, "--") { // Skip comments
continue
} else if strings.HasPrefix(line, "SET") || strings.HasPrefix(line, "SELECT") || strings.HasPrefix(line[1:], "connect") {
// Skip SET and SELECT and connect statements
} else if !schemaStarted && (strings.HasPrefix(trimLine, "SET") || strings.HasPrefix(trimLine, "SELECT") || strings.HasPrefix(trimLine, "\\connect") || strings.HasPrefix(trimLine, "CREATE DATABASE")) {
// Skip SET and SELECT and connect and create database statements
continue
} else if strings.HasPrefix(line, `CREATE DATABASE `) {
// Skip CREATE DATABASE statement
} else {
if strings.HasPrefix(line, "COPY kwild_voting.voters") && strings.Contains(line, "FROM stdin;") {
// Start of schema
if !schemaStarted && (strings.HasPrefix(trimLine, "CREATE SCHEMA") || strings.HasPrefix(trimLine, "CREATE TABLE") || strings.HasPrefix(trimLine, "CREATE FUNCTION")) {
schemaStarted = true
}

if strings.HasPrefix(trimLine, "COPY kwild_voting.voters") && strings.Contains(trimLine, "FROM stdin;") {
inVotersBlock = true
}

Expand Down
34 changes: 24 additions & 10 deletions internal/statesync/snapshotter.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ const (
stage1output = "stage1output.sql"
stage2output = "stage2output.sql"
stage3output = "stage3output.sql.gz"

CreateSchema = "CREATE SCHEMA"
CreateTable = "CREATE TABLE"
CreateFunction = "CREATE FUNCTION"
)

// This file deals with creating a snapshot instance at a given snapshotID
Expand Down Expand Up @@ -209,14 +213,15 @@ func (s *Snapshotter) sanitizeDump(height uint64, format uint32) ([]byte, error)
scanner := bufio.NewScanner(dumpInst1)
scanner.Buffer(buf, s.maxRowSize)

var inCopyBlock bool
var inCopyBlock, schemaStarted bool
var lineHashes []hashedLine
var offset int64
hasher := sha256.New()

for scanner.Scan() {
line := scanner.Text()
numBytes := int64(len(line)) + 1 // +1 for newline character
trimLine := strings.TrimSpace(line)

if inCopyBlock {
/*
Expand All @@ -226,7 +231,7 @@ func (s *Snapshotter) sanitizeDump(height uint64, format uint32) ([]byte, error)
3 entry3
\.
*/
if line == "\\." { // end of COPY block
if trimLine == "\\." { // end of COPY block
inCopyBlock = false

// Inline sort the lineHashes array based on the row hash
Expand Down Expand Up @@ -278,21 +283,30 @@ func (s *Snapshotter) sanitizeDump(height uint64, format uint32) ([]byte, error)
offset += numBytes
}
} else {
offset += int64(len(line)) + 1 // +1 for newline character
if line == "" || strings.TrimSpace(line) == "" {
offset += numBytes // +1 for newline character
if line == "" || trimLine == "" {
// skip empty lines
continue
} else if strings.HasPrefix(line, "--") {
} else if strings.HasPrefix(trimLine, "--") {
// skip comments
continue
} else if strings.HasPrefix(line, "SET") || strings.HasPrefix(line, "SELECT") ||
(len(line) > 1 && strings.HasPrefix(line[1:], "connect")) ||
strings.HasPrefix(line, "CREATE DATABASE") {
// skip SET, SELECT, CREATE DATABASE and connect statements
} else if !schemaStarted && (strings.HasPrefix(trimLine, CreateSchema) ||
strings.HasPrefix(trimLine, CreateTable) || strings.HasPrefix(trimLine, CreateFunction)) {
schemaStarted = true

// write the line to the output file
_, err := outputFile.WriteString(line + "\n")
if err != nil {
return nil, fmt.Errorf("failed to write to sanitized dump file: %w", err)
}
} else if !schemaStarted && (strings.HasPrefix(trimLine, "SET") || strings.HasPrefix(trimLine, "SELECT") ||
strings.HasPrefix(trimLine, "\\connect") || strings.HasPrefix(trimLine, "CREATE DATABASE")) {
// skip any SET, SELECT, CREATE DATABASE and connect statements that appear before the schema definition
// These are postgres specific commands that should not be included in the snapshot
continue
} else {
// Example: COPY kwild_voting.voters (id, name, power) FROM stdin;
if strings.HasPrefix(line, "COPY") && strings.Contains(line, "FROM stdin;") {
if strings.HasPrefix(trimLine, "COPY") && strings.Contains(trimLine, "FROM stdin;") {
inCopyBlock = true // start of COPY block
}
// write the line to the output file
Expand Down

0 comments on commit 51cfe56

Please sign in to comment.