From e1dd595181751c54fba5ce4b0d45ff05b20aad37 Mon Sep 17 00:00:00 2001 From: Aditya Maru Date: Fri, 19 Mar 2021 00:17:02 -0400 Subject: [PATCH] importccl: log all ignored pgdump stmts in different log files Previously, we would only log the first 10 ignored stmts, and skip over logging the rest. This was done due to concerns about memory usage when buffering, as ExternalStorage does not support streaming writes (yet). This change writes a new file every time we have buffered 10 ignored stmts. The files are written in the subdirectory `import/(unsupported_schema_stmts|unsupported_data_stmts)/.log` This allows for several imports to also log unsupported stmts to the same user provided location without stepping on each others toes. Release note (bug fix): log all unsupported pgdump statements across smaller log files that can be found in the subdir `import/(unsupported_schema_stmts|unsupported_data_stmts)/.log` --- pkg/ccl/importccl/import_processor.go | 3 +- pkg/ccl/importccl/import_stmt.go | 92 +++++++++++++-------- pkg/ccl/importccl/import_stmt_test.go | 66 ++++++++------- pkg/ccl/importccl/read_import_pgdump.go | 102 +++++++++++++++++------- 4 files changed, 175 insertions(+), 88 deletions(-) diff --git a/pkg/ccl/importccl/import_processor.go b/pkg/ccl/importccl/import_processor.go index 9c38cdc7bbb0..6881322b5124 100644 --- a/pkg/ccl/importccl/import_processor.go +++ b/pkg/ccl/importccl/import_processor.go @@ -233,7 +233,8 @@ func makeInputConverter( return newPgCopyReader(spec.Format.PgCopy, kvCh, spec.WalltimeNanos, int(spec.ReaderParallelism), singleTable, singleTableTargetCols, evalCtx) case roachpb.IOFileFormat_PgDump: - return newPgDumpReader(ctx, kvCh, spec.Format.PgDump, spec.WalltimeNanos, spec.Tables, evalCtx) + return newPgDumpReader(ctx, int64(spec.Progress.JobID), kvCh, spec.Format.PgDump, + spec.WalltimeNanos, spec.Tables, evalCtx) case roachpb.IOFileFormat_Avro: return newAvroInputReader( kvCh, singleTable, spec.Format.Avro, spec.WalltimeNanos, diff --git a/pkg/ccl/importccl/import_stmt.go b/pkg/ccl/importccl/import_stmt.go index dbb651e553be..4a1fa8c77363 100644 --- a/pkg/ccl/importccl/import_stmt.go +++ b/pkg/ccl/importccl/import_stmt.go @@ -15,6 +15,7 @@ import ( "io/ioutil" "math" "net/url" + "path" "sort" "strconv" "strings" @@ -108,7 +109,6 @@ const ( pgDumpIgnoreShuntFileDest = "log_ignored_statements" pgDumpUnsupportedSchemaStmtLog = "unsupported_schema_stmts" pgDumpUnsupportedDataStmtLog = "unsupported_data_stmts" - pgDumpMaxLoggedStmts = 10 // RunningStatusImportBundleParseSchema indicates to the user that a bundle format // schema is being parsed @@ -150,6 +150,16 @@ var importOptionExpectValues = map[string]sql.KVStringOptValidate{ pgDumpIgnoreShuntFileDest: sql.KVStringOptRequireValue, } +var pgDumpMaxLoggedStmts = 1024 + +func testingSetMaxLogIgnoredImportStatements(maxLogSize int) (cleanup func()) { + prevLogSize := pgDumpMaxLoggedStmts + pgDumpMaxLoggedStmts = maxLogSize + return func() { + pgDumpMaxLoggedStmts = prevLogSize + } +} + func makeStringSet(opts ...string) map[string]struct{} { res := make(map[string]struct{}, len(opts)) for _, opt := range opts { @@ -1572,6 +1582,10 @@ const ( // unsupportedStmtLogger is responsible for handling unsupported PGDUMP SQL // statements seen during the import. type unsupportedStmtLogger struct { + ctx context.Context + user security.SQLUsername + jobID int64 + // Values are initialized based on the options specified in the IMPORT PGDUMP // stmt. ignoreUnsupported bool @@ -1582,79 +1596,88 @@ type unsupportedStmtLogger struct { logBuffer *bytes.Buffer numIgnoredStmts int + // Incremented every time the logger flushes. It is used as the suffix of the + // log file written to external storage. + flushCount int + loggerType loggerKind } func makeUnsupportedStmtLogger( + ctx context.Context, + user security.SQLUsername, + jobID int64, ignoreUnsupported bool, unsupportedLogDest string, loggerType loggerKind, externalStorage cloud.ExternalStorageFactory, ) *unsupportedStmtLogger { - l := &unsupportedStmtLogger{ + return &unsupportedStmtLogger{ + ctx: ctx, + user: user, + jobID: jobID, ignoreUnsupported: ignoreUnsupported, ignoreUnsupportedLogDest: unsupportedLogDest, loggerType: loggerType, logBuffer: new(bytes.Buffer), externalStorage: externalStorage, } - header := "Unsupported statements during schema parse phase:\n\n" - if loggerType == dataIngestion { - header = "Unsupported statements during data ingestion phase:\n\n" - } - l.logBuffer.WriteString(header) - return l } -func (u *unsupportedStmtLogger) log(logLine string, isParseError bool) { +func (u *unsupportedStmtLogger) log(logLine string, isParseError bool) error { // We have already logged parse errors during the schema ingestion phase, so // skip them to avoid duplicate entries. skipLoggingParseErr := isParseError && u.loggerType == dataIngestion if u.ignoreUnsupportedLogDest == "" || skipLoggingParseErr { - return + return nil } - if u.numIgnoredStmts < pgDumpMaxLoggedStmts { - if isParseError { - logLine = fmt.Sprintf("%s: could not be parsed\n", logLine) - } else { - logLine = fmt.Sprintf("%s: unsupported by IMPORT\n", logLine) + // Flush to a file if we have hit the max size of our buffer. + if u.numIgnoredStmts >= pgDumpMaxLoggedStmts { + err := u.flush() + if err != nil { + return err } - u.logBuffer.Write([]byte(logLine)) } + + if isParseError { + logLine = fmt.Sprintf("%s: could not be parsed\n", logLine) + } else { + logLine = fmt.Sprintf("%s: unsupported by IMPORT\n", logLine) + } + u.logBuffer.Write([]byte(logLine)) u.numIgnoredStmts++ + return nil } -func (u *unsupportedStmtLogger) flush(ctx context.Context, user security.SQLUsername) error { +func (u *unsupportedStmtLogger) flush() error { if u.ignoreUnsupportedLogDest == "" { return nil } - numLoggedStmts := pgDumpMaxLoggedStmts - if u.numIgnoredStmts < pgDumpMaxLoggedStmts { - numLoggedStmts = u.numIgnoredStmts - } - u.logBuffer.WriteString(fmt.Sprintf("\nLogging %d out of %d ignored statements.\n", - numLoggedStmts, u.numIgnoredStmts)) - - conf, err := cloudimpl.ExternalStorageConfFromURI(u.ignoreUnsupportedLogDest, user) + conf, err := cloudimpl.ExternalStorageConfFromURI(u.ignoreUnsupportedLogDest, u.user) if err != nil { return errors.Wrap(err, "failed to log unsupported stmts during IMPORT PGDUMP") } var s cloud.ExternalStorage - if s, err = u.externalStorage(ctx, conf); err != nil { + if s, err = u.externalStorage(u.ctx, conf); err != nil { return errors.New("failed to log unsupported stmts during IMPORT PGDUMP") } defer s.Close() - logFileName := pgDumpUnsupportedSchemaStmtLog + logFileName := fmt.Sprintf("import%d", u.jobID) if u.loggerType == dataIngestion { - logFileName = pgDumpUnsupportedDataStmtLog + logFileName = path.Join(logFileName, pgDumpUnsupportedDataStmtLog, fmt.Sprintf("%d.log", u.flushCount)) + } else { + logFileName = path.Join(logFileName, pgDumpUnsupportedSchemaStmtLog, fmt.Sprintf("%d.log", u.flushCount)) } - err = s.WriteFile(ctx, logFileName, bytes.NewReader(u.logBuffer.Bytes())) + err = s.WriteFile(u.ctx, logFileName, bytes.NewReader(u.logBuffer.Bytes())) if err != nil { return errors.Wrap(err, "failed to log unsupported stmts to log during IMPORT PGDUMP") } + u.flushCount++ + u.numIgnoredStmts = 0 + u.logBuffer.Truncate(0) return nil } @@ -1671,6 +1694,7 @@ func parseAndCreateBundleTableDescs( format roachpb.IOFileFormat, walltime int64, owner security.SQLUsername, + jobID jobspb.JobID, ) ([]*tabledesc.Mutable, []*schemadesc.Mutable, error) { var schemaDescs []*schemadesc.Mutable @@ -1714,13 +1738,14 @@ func parseAndCreateBundleTableDescs( evalCtx := &p.ExtendedEvalContext().EvalContext // Setup a logger to handle unsupported DDL statements in the PGDUMP file. - unsupportedStmtLogger := makeUnsupportedStmtLogger(format.PgDump.IgnoreUnsupported, - format.PgDump.IgnoreUnsupportedLog, schemaParsing, p.ExecCfg().DistSQLSrv.ExternalStorage) + unsupportedStmtLogger := makeUnsupportedStmtLogger(ctx, p.User(), int64(jobID), + format.PgDump.IgnoreUnsupported, format.PgDump.IgnoreUnsupportedLog, schemaParsing, + p.ExecCfg().DistSQLSrv.ExternalStorage) tableDescs, schemaDescs, err = readPostgresCreateTable(ctx, reader, evalCtx, p, tableName, parentID, walltime, fks, int(format.PgDump.MaxRowSize), owner, unsupportedStmtLogger) - logErr := unsupportedStmtLogger.flush(ctx, p.User()) + logErr := unsupportedStmtLogger.flush() if logErr != nil { return nil, nil, logErr } @@ -1765,7 +1790,8 @@ func (r *importResumer) parseBundleSchemaIfNeeded(ctx context.Context, phs inter walltime := p.ExecCfg().Clock.Now().WallTime if tableDescs, schemaDescs, err = parseAndCreateBundleTableDescs( - ctx, p, details, seqVals, skipFKs, parentID, files, format, walltime, owner); err != nil { + ctx, p, details, seqVals, skipFKs, parentID, files, format, walltime, owner, + r.job.ID()); err != nil { return err } diff --git a/pkg/ccl/importccl/import_stmt_test.go b/pkg/ccl/importccl/import_stmt_test.go index 884ab8090010..412a2412164f 100644 --- a/pkg/ccl/importccl/import_stmt_test.go +++ b/pkg/ccl/importccl/import_stmt_test.go @@ -20,6 +20,7 @@ import ( "net/http" "net/http/httptest" "net/url" + "path" "path/filepath" "regexp" "strings" @@ -5788,7 +5789,7 @@ func TestImportPgDumpIgnoredStmts(t *testing.T) { t.Run("ignore-unsupported", func(t *testing.T) { sqlDB.Exec(t, "CREATE DATABASE foo; USE foo;") sqlDB.Exec(t, "IMPORT PGDUMP ($1) WITH ignore_unsupported_statements", srv.URL) - // Check that statements which are not expected to be ignored, are still + // Check that statements that are not expected to be ignored, are still // processed. sqlDB.CheckQueryResults(t, "SELECT * FROM foo", [][]string{{"1"}, {"2"}, {"3"}}) }) @@ -5800,16 +5801,20 @@ func TestImportPgDumpIgnoredStmts(t *testing.T) { t.Run("require-both-unsupported-options", func(t *testing.T) { sqlDB.Exec(t, "CREATE DATABASE foo2; USE foo2;") - ignoredLog := `userfile:///ignore.log` + ignoredLog := `userfile:///ignore` sqlDB.ExpectErr(t, "cannot log unsupported PGDUMP stmts without `ignore_unsupported_statements` option", "IMPORT PGDUMP ($1) WITH log_ignored_statements=$2", srv.URL, ignoredLog) }) t.Run("log-unsupported-stmts", func(t *testing.T) { sqlDB.Exec(t, "CREATE DATABASE foo3; USE foo3;") - ignoredLog := `userfile:///ignore.log` - sqlDB.Exec(t, "IMPORT PGDUMP ($1) WITH ignore_unsupported_statements, log_ignored_statements=$2", - srv.URL, ignoredLog) + ignoredLog := `userfile:///ignore` + defer testingSetMaxLogIgnoredImportStatements(10 /* maxLogSize */)() + var importJobID int + var unused interface{} + sqlDB.QueryRow(t, "IMPORT PGDUMP ($1) WITH ignore_unsupported_statements, "+ + "log_ignored_statements=$2", srv.URL, ignoredLog).Scan(&importJobID, &unused, &unused, + &unused, &unused, &unused) // Check that statements which are not expected to be ignored, are still // processed. sqlDB.CheckQueryResults(t, "SELECT * FROM foo", [][]string{{"1"}, {"2"}, {"3"}}) @@ -5823,13 +5828,24 @@ func TestImportPgDumpIgnoredStmts(t *testing.T) { tc.Servers[0].InternalExecutor().(*sql.InternalExecutor), tc.Servers[0].DB()) require.NoError(t, err) defer store.Close() - content, err := store.ReadFile(ctx, pgDumpUnsupportedSchemaStmtLog) - require.NoError(t, err) - descBytes, err := ioutil.ReadAll(content) - require.NoError(t, err) - expectedSchemaLog := `Unsupported statements during schema parse phase: -create trigger: could not be parsed + // We expect there to be two log files since we have 13 unsupported statements. + dirName := fmt.Sprintf("import%d", importJobID) + checkFiles := func(expectedFileContent []string, logSubdir string) { + files, err := store.ListFiles(ctx, fmt.Sprintf("*/%s/*", logSubdir)) + require.NoError(t, err) + for i, file := range files { + require.Equal(t, file, path.Join(dirName, logSubdir, fmt.Sprintf("%d.log", i))) + content, err := store.ReadFile(ctx, file) + require.NoError(t, err) + descBytes, err := ioutil.ReadAll(content) + require.NoError(t, err) + require.Equal(t, []byte(expectedFileContent[i]), descBytes) + } + } + + schemaFileContents := []string{ + `create trigger: could not be parsed revoke privileges on sequence: could not be parsed revoke privileges on sequence: could not be parsed grant privileges on sequence: could not be parsed @@ -5839,24 +5855,20 @@ create extension if not exists with: could not be parsed create function: could not be parsed alter function: could not be parsed COMMENT ON TABLE t IS 'This should be skipped': unsupported by IMPORT +`, + `COMMENT ON DATABASE t IS 'This should be skipped': unsupported by IMPORT +COMMENT ON COLUMN t IS 'This should be skipped': unsupported by IMPORT +unsupported function call: set_config in stmt: SELECT set_config('search_path', '', false): unsupported by IMPORT +`, + } + checkFiles(schemaFileContents, pgDumpUnsupportedSchemaStmtLog) -Logging 10 out of 13 ignored statements. -` - require.Equal(t, []byte(expectedSchemaLog), descBytes) - - expectedDataLog := `Unsupported statements during data ingestion phase: - -unsupported 3 fn args in select: ['search_path' '' false]: unsupported by IMPORT + ingestionFileContents := []string{ + `unsupported 3 fn args in select: ['search_path' '' false]: unsupported by IMPORT unsupported *tree.Delete statement: DELETE FROM geometry_columns WHERE (f_table_name = 'nyc_census_blocks') AND (f_table_schema = 'public'): unsupported by IMPORT - -Logging 2 out of 2 ignored statements. -` - - content, err = store.ReadFile(ctx, pgDumpUnsupportedDataStmtLog) - require.NoError(t, err) - descBytes, err = ioutil.ReadAll(content) - require.NoError(t, err) - require.Equal(t, []byte(expectedDataLog), descBytes) +`, + } + checkFiles(ingestionFileContents, pgDumpUnsupportedDataStmtLog) }) } diff --git a/pkg/ccl/importccl/read_import_pgdump.go b/pkg/ccl/importccl/read_import_pgdump.go index ca2e75668e7d..bb23871975eb 100644 --- a/pkg/ccl/importccl/read_import_pgdump.go +++ b/pkg/ccl/importccl/read_import_pgdump.go @@ -111,7 +111,10 @@ func (p *postgreStream) Next() (interface{}, error) { // so here. if p.unsupportedStmtLogger.ignoreUnsupported && errors.HasType(err, (*tree.UnsupportedError)(nil)) { if unsupportedErr := (*tree.UnsupportedError)(nil); errors.As(err, &unsupportedErr) { - p.unsupportedStmtLogger.log(unsupportedErr.FeatureName, true /* isParseError */) + err := p.unsupportedStmtLogger.log(unsupportedErr.FeatureName, true /* isParseError */) + if err != nil { + return nil, err + } } continue } @@ -616,7 +619,10 @@ func readPostgresStmt( case *tree.AlterTableAddColumn: if cmd.IfNotExists { if ignoreUnsupportedStmts { - unsupportedStmtLogger.log(stmt.String(), false /* isParseError */) + err := unsupportedStmtLogger.log(stmt.String(), false /* isParseError */) + if err != nil { + return err + } continue } return errors.Errorf("unsupported statement: %s", stmt) @@ -641,7 +647,10 @@ func readPostgresStmt( } default: if ignoreUnsupportedStmts { - unsupportedStmtLogger.log(stmt.String(), false /* isParseError */) + err := unsupportedStmtLogger.log(stmt.String(), false /* isParseError */) + if err != nil { + return err + } continue } return errors.Errorf("unsupported statement: %s", stmt) @@ -649,8 +658,7 @@ func readPostgresStmt( } case *tree.AlterTableOwner: if ignoreUnsupportedStmts { - unsupportedStmtLogger.log(stmt.String(), false /* isParseError */) - return nil + return unsupportedStmtLogger.log(stmt.String(), false /* isParseError */) } return errors.Errorf("unsupported statement: %s", stmt) case *tree.CreateSequence: @@ -663,8 +671,7 @@ func readPostgresStmt( } case *tree.AlterSequence: if ignoreUnsupportedStmts { - unsupportedStmtLogger.log(stmt.String(), false /* isParseError */) - return nil + return unsupportedStmtLogger.log(stmt.String(), false /* isParseError */) } return errors.Errorf("unsupported %T statement: %s", stmt, stmt) // Some SELECT statements mutate schema. Search for those here. @@ -693,7 +700,10 @@ func readPostgresStmt( err := errors.Errorf("unsupported function call: %s in stmt: %s", expr.Func.String(), stmt.String()) if ignoreUnsupportedStmts { - unsupportedStmtLogger.log(err.Error(), false /* isParseError */) + err := unsupportedStmtLogger.log(err.Error(), false /* isParseError */) + if err != nil { + return err + } continue } return err @@ -735,7 +745,10 @@ func readPostgresStmt( default: err := errors.Errorf("unsupported %T SELECT expr: %s", expr, expr) if ignoreUnsupportedStmts { - unsupportedStmtLogger.log(err.Error(), false /* isParseError */) + err := unsupportedStmtLogger.log(err.Error(), false /* isParseError */) + if err != nil { + return err + } continue } return err @@ -744,7 +757,10 @@ func readPostgresStmt( default: err := errors.Errorf("unsupported %T SELECT %s", sel, sel) if ignoreUnsupportedStmts { - unsupportedStmtLogger.log(err.Error(), false /* isParseError */) + err := unsupportedStmtLogger.log(err.Error(), false /* isParseError */) + if err != nil { + return err + } return nil } return err @@ -790,8 +806,7 @@ func readPostgresStmt( // - ANALYZE is syntactic sugar for CreateStatistics. It can be ignored // because the auto stats stuff will pick up the changes and run if needed. if ignoreUnsupportedStmts { - unsupportedStmtLogger.log(fmt.Sprintf("%s", stmt), false /* isParseError */) - return nil + return unsupportedStmtLogger.log(fmt.Sprintf("%s", stmt), false /* isParseError */) } return errors.Errorf("unsupported %T statement: %s", stmt, stmt) case error: @@ -800,8 +815,7 @@ func readPostgresStmt( } default: if ignoreUnsupportedStmts { - unsupportedStmtLogger.log(fmt.Sprintf("%s", stmt), false /* isParseError */) - return nil + return unsupportedStmtLogger.log(fmt.Sprintf("%s", stmt), false /* isParseError */) } return errors.Errorf("unsupported %T statement: %s", stmt, stmt) } @@ -845,6 +859,7 @@ type pgDumpReader struct { opts roachpb.PgDumpOptions walltime int64 colMap map[*row.DatumRowConverter](map[string]int) + jobID int64 unsupportedStmtLogger *unsupportedStmtLogger evalCtx *tree.EvalContext } @@ -854,6 +869,7 @@ var _ inputConverter = &pgDumpReader{} // newPgDumpReader creates a new inputConverter for pg_dump files. func newPgDumpReader( ctx context.Context, + jobID int64, kvCh chan row.KVBatch, opts roachpb.PgDumpOptions, walltime int64, @@ -895,6 +911,7 @@ func newPgDumpReader( opts: opts, walltime: walltime, colMap: colMap, + jobID: jobID, evalCtx: evalCtx, }, nil } @@ -911,15 +928,16 @@ func (m *pgDumpReader) readFiles( user security.SQLUsername, ) error { // Setup logger to handle unsupported DML statements seen in the PGDUMP file. - m.unsupportedStmtLogger = makeUnsupportedStmtLogger(format.PgDump.IgnoreUnsupported, - format.PgDump.IgnoreUnsupportedLog, dataIngestion, makeExternalStorage) + m.unsupportedStmtLogger = makeUnsupportedStmtLogger(ctx, user, + m.jobID, format.PgDump.IgnoreUnsupported, format.PgDump.IgnoreUnsupportedLog, dataIngestion, + makeExternalStorage) err := readInputFiles(ctx, dataFiles, resumePos, format, m.readFile, makeExternalStorage, user) if err != nil { return err } - return m.unsupportedStmtLogger.flush(ctx, user) + return m.unsupportedStmtLogger.flush() } func (m *pgDumpReader) readFile( @@ -976,7 +994,10 @@ func (m *pgDumpReader) readFile( if m.unsupportedStmtLogger.ignoreUnsupported { logLine := fmt.Sprintf("%s: unsupported by IMPORT\n", i.Rows.Select.String()) - m.unsupportedStmtLogger.log(logLine, false /* isParseError */) + err := m.unsupportedStmtLogger.log(logLine, false /* isParseError */) + if err != nil { + return err + } continue } return errors.Errorf("unsupported: %s", i.Rows.Select) @@ -1127,7 +1148,10 @@ func (m *pgDumpReader) readFile( if !ok { err := errors.Errorf("unsupported %T Select: %v", i.Select, i.Select) if m.unsupportedStmtLogger.ignoreUnsupported { - m.unsupportedStmtLogger.log(err.Error(), false /* isParseError */) + err := m.unsupportedStmtLogger.log(err.Error(), false /* isParseError */) + if err != nil { + return err + } continue } return err @@ -1135,7 +1159,10 @@ func (m *pgDumpReader) readFile( if len(sc.Exprs) != 1 { err := errors.Errorf("unsupported %d select args: %v", len(sc.Exprs), sc.Exprs) if m.unsupportedStmtLogger.ignoreUnsupported { - m.unsupportedStmtLogger.log(err.Error(), false /* isParseError */) + err := m.unsupportedStmtLogger.log(err.Error(), false /* isParseError */) + if err != nil { + return err + } continue } return err @@ -1144,7 +1171,10 @@ func (m *pgDumpReader) readFile( if !ok { err := errors.Errorf("unsupported select arg %T: %v", sc.Exprs[0].Expr, sc.Exprs[0].Expr) if m.unsupportedStmtLogger.ignoreUnsupported { - m.unsupportedStmtLogger.log(err.Error(), false /* isParseError */) + err := m.unsupportedStmtLogger.log(err.Error(), false /* isParseError */) + if err != nil { + return err + } continue } return err @@ -1154,7 +1184,10 @@ func (m *pgDumpReader) readFile( case "search_path", "pg_catalog.set_config": err := errors.Errorf("unsupported %d fn args in select: %v", len(fn.Exprs), fn.Exprs) if m.unsupportedStmtLogger.ignoreUnsupported { - m.unsupportedStmtLogger.log(err.Error(), false /* isParseError */) + err := m.unsupportedStmtLogger.log(err.Error(), false /* isParseError */) + if err != nil { + return err + } continue } return err @@ -1162,7 +1195,10 @@ func (m *pgDumpReader) readFile( if args := len(fn.Exprs); args < 2 || args > 3 { err := errors.Errorf("unsupported %d fn args in select: %v", len(fn.Exprs), fn.Exprs) if m.unsupportedStmtLogger.ignoreUnsupported { - m.unsupportedStmtLogger.log(err.Error(), false /* isParseError */) + err := m.unsupportedStmtLogger.log(err.Error(), false /* isParseError */) + if err != nil { + return err + } continue } return err @@ -1183,7 +1219,10 @@ func (m *pgDumpReader) readFile( if !ok { err := errors.Errorf("unsupported setval %T arg: %v", fn.Exprs[1], fn.Exprs[1]) if m.unsupportedStmtLogger.ignoreUnsupported { - m.unsupportedStmtLogger.log(err.Error(), false /* isParseError */) + err := m.unsupportedStmtLogger.log(err.Error(), false /* isParseError */) + if err != nil { + return err + } continue } return err @@ -1198,7 +1237,10 @@ func (m *pgDumpReader) readFile( if !ok { err := errors.Errorf("unsupported setval %T arg: %v", fn.Exprs[2], fn.Exprs[2]) if m.unsupportedStmtLogger.ignoreUnsupported { - m.unsupportedStmtLogger.log(err.Error(), false /* isParseError */) + err := m.unsupportedStmtLogger.log(err.Error(), false /* isParseError */) + if err != nil { + return err + } continue } return err @@ -1232,7 +1274,10 @@ func (m *pgDumpReader) readFile( default: err := errors.Errorf("unsupported function %s in stmt %s", funcName, i.Select.String()) if m.unsupportedStmtLogger.ignoreUnsupported { - m.unsupportedStmtLogger.log(err.Error(), false /* isParseError */) + err := m.unsupportedStmtLogger.log(err.Error(), false /* isParseError */) + if err != nil { + return err + } continue } return err @@ -1248,7 +1293,10 @@ func (m *pgDumpReader) readFile( default: err := errors.Errorf("unsupported %T statement: %v", i, i) if m.unsupportedStmtLogger.ignoreUnsupported { - m.unsupportedStmtLogger.log(err.Error(), false /* isParseError */) + err := m.unsupportedStmtLogger.log(err.Error(), false /* isParseError */) + if err != nil { + return err + } continue } return err