diff --git a/pkg/ccl/importccl/import_processor.go b/pkg/ccl/importccl/import_processor.go index 9c38cdc7bbb0..6881322b5124 100644 --- a/pkg/ccl/importccl/import_processor.go +++ b/pkg/ccl/importccl/import_processor.go @@ -233,7 +233,8 @@ func makeInputConverter( return newPgCopyReader(spec.Format.PgCopy, kvCh, spec.WalltimeNanos, int(spec.ReaderParallelism), singleTable, singleTableTargetCols, evalCtx) case roachpb.IOFileFormat_PgDump: - return newPgDumpReader(ctx, kvCh, spec.Format.PgDump, spec.WalltimeNanos, spec.Tables, evalCtx) + return newPgDumpReader(ctx, int64(spec.Progress.JobID), kvCh, spec.Format.PgDump, + spec.WalltimeNanos, spec.Tables, evalCtx) case roachpb.IOFileFormat_Avro: return newAvroInputReader( kvCh, singleTable, spec.Format.Avro, spec.WalltimeNanos, diff --git a/pkg/ccl/importccl/import_stmt.go b/pkg/ccl/importccl/import_stmt.go index dbb651e553be..4a1fa8c77363 100644 --- a/pkg/ccl/importccl/import_stmt.go +++ b/pkg/ccl/importccl/import_stmt.go @@ -15,6 +15,7 @@ import ( "io/ioutil" "math" "net/url" + "path" "sort" "strconv" "strings" @@ -108,7 +109,6 @@ const ( pgDumpIgnoreShuntFileDest = "log_ignored_statements" pgDumpUnsupportedSchemaStmtLog = "unsupported_schema_stmts" pgDumpUnsupportedDataStmtLog = "unsupported_data_stmts" - pgDumpMaxLoggedStmts = 10 // RunningStatusImportBundleParseSchema indicates to the user that a bundle format // schema is being parsed @@ -150,6 +150,16 @@ var importOptionExpectValues = map[string]sql.KVStringOptValidate{ pgDumpIgnoreShuntFileDest: sql.KVStringOptRequireValue, } +var pgDumpMaxLoggedStmts = 1024 + +func testingSetMaxLogIgnoredImportStatements(maxLogSize int) (cleanup func()) { + prevLogSize := pgDumpMaxLoggedStmts + pgDumpMaxLoggedStmts = maxLogSize + return func() { + pgDumpMaxLoggedStmts = prevLogSize + } +} + func makeStringSet(opts ...string) map[string]struct{} { res := make(map[string]struct{}, len(opts)) for _, opt := range opts { @@ -1572,6 +1582,10 @@ const ( // unsupportedStmtLogger is responsible for handling unsupported PGDUMP SQL // statements seen during the import. type unsupportedStmtLogger struct { + ctx context.Context + user security.SQLUsername + jobID int64 + // Values are initialized based on the options specified in the IMPORT PGDUMP // stmt. ignoreUnsupported bool @@ -1582,79 +1596,88 @@ type unsupportedStmtLogger struct { logBuffer *bytes.Buffer numIgnoredStmts int + // Incremented every time the logger flushes. It is used as the suffix of the + // log file written to external storage. + flushCount int + loggerType loggerKind } func makeUnsupportedStmtLogger( + ctx context.Context, + user security.SQLUsername, + jobID int64, ignoreUnsupported bool, unsupportedLogDest string, loggerType loggerKind, externalStorage cloud.ExternalStorageFactory, ) *unsupportedStmtLogger { - l := &unsupportedStmtLogger{ + return &unsupportedStmtLogger{ + ctx: ctx, + user: user, + jobID: jobID, ignoreUnsupported: ignoreUnsupported, ignoreUnsupportedLogDest: unsupportedLogDest, loggerType: loggerType, logBuffer: new(bytes.Buffer), externalStorage: externalStorage, } - header := "Unsupported statements during schema parse phase:\n\n" - if loggerType == dataIngestion { - header = "Unsupported statements during data ingestion phase:\n\n" - } - l.logBuffer.WriteString(header) - return l } -func (u *unsupportedStmtLogger) log(logLine string, isParseError bool) { +func (u *unsupportedStmtLogger) log(logLine string, isParseError bool) error { // We have already logged parse errors during the schema ingestion phase, so // skip them to avoid duplicate entries. skipLoggingParseErr := isParseError && u.loggerType == dataIngestion if u.ignoreUnsupportedLogDest == "" || skipLoggingParseErr { - return + return nil } - if u.numIgnoredStmts < pgDumpMaxLoggedStmts { - if isParseError { - logLine = fmt.Sprintf("%s: could not be parsed\n", logLine) - } else { - logLine = fmt.Sprintf("%s: unsupported by IMPORT\n", logLine) + // Flush to a file if we have hit the max size of our buffer. + if u.numIgnoredStmts >= pgDumpMaxLoggedStmts { + err := u.flush() + if err != nil { + return err } - u.logBuffer.Write([]byte(logLine)) } + + if isParseError { + logLine = fmt.Sprintf("%s: could not be parsed\n", logLine) + } else { + logLine = fmt.Sprintf("%s: unsupported by IMPORT\n", logLine) + } + u.logBuffer.Write([]byte(logLine)) u.numIgnoredStmts++ + return nil } -func (u *unsupportedStmtLogger) flush(ctx context.Context, user security.SQLUsername) error { +func (u *unsupportedStmtLogger) flush() error { if u.ignoreUnsupportedLogDest == "" { return nil } - numLoggedStmts := pgDumpMaxLoggedStmts - if u.numIgnoredStmts < pgDumpMaxLoggedStmts { - numLoggedStmts = u.numIgnoredStmts - } - u.logBuffer.WriteString(fmt.Sprintf("\nLogging %d out of %d ignored statements.\n", - numLoggedStmts, u.numIgnoredStmts)) - - conf, err := cloudimpl.ExternalStorageConfFromURI(u.ignoreUnsupportedLogDest, user) + conf, err := cloudimpl.ExternalStorageConfFromURI(u.ignoreUnsupportedLogDest, u.user) if err != nil { return errors.Wrap(err, "failed to log unsupported stmts during IMPORT PGDUMP") } var s cloud.ExternalStorage - if s, err = u.externalStorage(ctx, conf); err != nil { + if s, err = u.externalStorage(u.ctx, conf); err != nil { return errors.New("failed to log unsupported stmts during IMPORT PGDUMP") } defer s.Close() - logFileName := pgDumpUnsupportedSchemaStmtLog + logFileName := fmt.Sprintf("import%d", u.jobID) if u.loggerType == dataIngestion { - logFileName = pgDumpUnsupportedDataStmtLog + logFileName = path.Join(logFileName, pgDumpUnsupportedDataStmtLog, fmt.Sprintf("%d.log", u.flushCount)) + } else { + logFileName = path.Join(logFileName, pgDumpUnsupportedSchemaStmtLog, fmt.Sprintf("%d.log", u.flushCount)) } - err = s.WriteFile(ctx, logFileName, bytes.NewReader(u.logBuffer.Bytes())) + err = s.WriteFile(u.ctx, logFileName, bytes.NewReader(u.logBuffer.Bytes())) if err != nil { return errors.Wrap(err, "failed to log unsupported stmts to log during IMPORT PGDUMP") } + u.flushCount++ + u.numIgnoredStmts = 0 + u.logBuffer.Truncate(0) return nil } @@ -1671,6 +1694,7 @@ func parseAndCreateBundleTableDescs( format roachpb.IOFileFormat, walltime int64, owner security.SQLUsername, + jobID jobspb.JobID, ) ([]*tabledesc.Mutable, []*schemadesc.Mutable, error) { var schemaDescs []*schemadesc.Mutable @@ -1714,13 +1738,14 @@ func parseAndCreateBundleTableDescs( evalCtx := &p.ExtendedEvalContext().EvalContext // Setup a logger to handle unsupported DDL statements in the PGDUMP file. - unsupportedStmtLogger := makeUnsupportedStmtLogger(format.PgDump.IgnoreUnsupported, - format.PgDump.IgnoreUnsupportedLog, schemaParsing, p.ExecCfg().DistSQLSrv.ExternalStorage) + unsupportedStmtLogger := makeUnsupportedStmtLogger(ctx, p.User(), int64(jobID), + format.PgDump.IgnoreUnsupported, format.PgDump.IgnoreUnsupportedLog, schemaParsing, + p.ExecCfg().DistSQLSrv.ExternalStorage) tableDescs, schemaDescs, err = readPostgresCreateTable(ctx, reader, evalCtx, p, tableName, parentID, walltime, fks, int(format.PgDump.MaxRowSize), owner, unsupportedStmtLogger) - logErr := unsupportedStmtLogger.flush(ctx, p.User()) + logErr := unsupportedStmtLogger.flush() if logErr != nil { return nil, nil, logErr } @@ -1765,7 +1790,8 @@ func (r *importResumer) parseBundleSchemaIfNeeded(ctx context.Context, phs inter walltime := p.ExecCfg().Clock.Now().WallTime if tableDescs, schemaDescs, err = parseAndCreateBundleTableDescs( - ctx, p, details, seqVals, skipFKs, parentID, files, format, walltime, owner); err != nil { + ctx, p, details, seqVals, skipFKs, parentID, files, format, walltime, owner, + r.job.ID()); err != nil { return err } diff --git a/pkg/ccl/importccl/import_stmt_test.go b/pkg/ccl/importccl/import_stmt_test.go index 884ab8090010..412a2412164f 100644 --- a/pkg/ccl/importccl/import_stmt_test.go +++ b/pkg/ccl/importccl/import_stmt_test.go @@ -20,6 +20,7 @@ import ( "net/http" "net/http/httptest" "net/url" + "path" "path/filepath" "regexp" "strings" @@ -5788,7 +5789,7 @@ func TestImportPgDumpIgnoredStmts(t *testing.T) { t.Run("ignore-unsupported", func(t *testing.T) { sqlDB.Exec(t, "CREATE DATABASE foo; USE foo;") sqlDB.Exec(t, "IMPORT PGDUMP ($1) WITH ignore_unsupported_statements", srv.URL) - // Check that statements which are not expected to be ignored, are still + // Check that statements that are not expected to be ignored, are still // processed. sqlDB.CheckQueryResults(t, "SELECT * FROM foo", [][]string{{"1"}, {"2"}, {"3"}}) }) @@ -5800,16 +5801,20 @@ func TestImportPgDumpIgnoredStmts(t *testing.T) { t.Run("require-both-unsupported-options", func(t *testing.T) { sqlDB.Exec(t, "CREATE DATABASE foo2; USE foo2;") - ignoredLog := `userfile:///ignore.log` + ignoredLog := `userfile:///ignore` sqlDB.ExpectErr(t, "cannot log unsupported PGDUMP stmts without `ignore_unsupported_statements` option", "IMPORT PGDUMP ($1) WITH log_ignored_statements=$2", srv.URL, ignoredLog) }) t.Run("log-unsupported-stmts", func(t *testing.T) { sqlDB.Exec(t, "CREATE DATABASE foo3; USE foo3;") - ignoredLog := `userfile:///ignore.log` - sqlDB.Exec(t, "IMPORT PGDUMP ($1) WITH ignore_unsupported_statements, log_ignored_statements=$2", - srv.URL, ignoredLog) + ignoredLog := `userfile:///ignore` + defer testingSetMaxLogIgnoredImportStatements(10 /* maxLogSize */)() + var importJobID int + var unused interface{} + sqlDB.QueryRow(t, "IMPORT PGDUMP ($1) WITH ignore_unsupported_statements, "+ + "log_ignored_statements=$2", srv.URL, ignoredLog).Scan(&importJobID, &unused, &unused, + &unused, &unused, &unused) // Check that statements which are not expected to be ignored, are still // processed. sqlDB.CheckQueryResults(t, "SELECT * FROM foo", [][]string{{"1"}, {"2"}, {"3"}}) @@ -5823,13 +5828,24 @@ func TestImportPgDumpIgnoredStmts(t *testing.T) { tc.Servers[0].InternalExecutor().(*sql.InternalExecutor), tc.Servers[0].DB()) require.NoError(t, err) defer store.Close() - content, err := store.ReadFile(ctx, pgDumpUnsupportedSchemaStmtLog) - require.NoError(t, err) - descBytes, err := ioutil.ReadAll(content) - require.NoError(t, err) - expectedSchemaLog := `Unsupported statements during schema parse phase: -create trigger: could not be parsed + // We expect there to be two log files since we have 13 unsupported statements. + dirName := fmt.Sprintf("import%d", importJobID) + checkFiles := func(expectedFileContent []string, logSubdir string) { + files, err := store.ListFiles(ctx, fmt.Sprintf("*/%s/*", logSubdir)) + require.NoError(t, err) + for i, file := range files { + require.Equal(t, file, path.Join(dirName, logSubdir, fmt.Sprintf("%d.log", i))) + content, err := store.ReadFile(ctx, file) + require.NoError(t, err) + descBytes, err := ioutil.ReadAll(content) + require.NoError(t, err) + require.Equal(t, []byte(expectedFileContent[i]), descBytes) + } + } + + schemaFileContents := []string{ + `create trigger: could not be parsed revoke privileges on sequence: could not be parsed revoke privileges on sequence: could not be parsed grant privileges on sequence: could not be parsed @@ -5839,24 +5855,20 @@ create extension if not exists with: could not be parsed create function: could not be parsed alter function: could not be parsed COMMENT ON TABLE t IS 'This should be skipped': unsupported by IMPORT +`, + `COMMENT ON DATABASE t IS 'This should be skipped': unsupported by IMPORT +COMMENT ON COLUMN t IS 'This should be skipped': unsupported by IMPORT +unsupported function call: set_config in stmt: SELECT set_config('search_path', '', false): unsupported by IMPORT +`, + } + checkFiles(schemaFileContents, pgDumpUnsupportedSchemaStmtLog) -Logging 10 out of 13 ignored statements. -` - require.Equal(t, []byte(expectedSchemaLog), descBytes) - - expectedDataLog := `Unsupported statements during data ingestion phase: - -unsupported 3 fn args in select: ['search_path' '' false]: unsupported by IMPORT + ingestionFileContents := []string{ + `unsupported 3 fn args in select: ['search_path' '' false]: unsupported by IMPORT unsupported *tree.Delete statement: DELETE FROM geometry_columns WHERE (f_table_name = 'nyc_census_blocks') AND (f_table_schema = 'public'): unsupported by IMPORT - -Logging 2 out of 2 ignored statements. -` - - content, err = store.ReadFile(ctx, pgDumpUnsupportedDataStmtLog) - require.NoError(t, err) - descBytes, err = ioutil.ReadAll(content) - require.NoError(t, err) - require.Equal(t, []byte(expectedDataLog), descBytes) +`, + } + checkFiles(ingestionFileContents, pgDumpUnsupportedDataStmtLog) }) } diff --git a/pkg/ccl/importccl/read_import_pgdump.go b/pkg/ccl/importccl/read_import_pgdump.go index ca2e75668e7d..bb23871975eb 100644 --- a/pkg/ccl/importccl/read_import_pgdump.go +++ b/pkg/ccl/importccl/read_import_pgdump.go @@ -111,7 +111,10 @@ func (p *postgreStream) Next() (interface{}, error) { // so here. if p.unsupportedStmtLogger.ignoreUnsupported && errors.HasType(err, (*tree.UnsupportedError)(nil)) { if unsupportedErr := (*tree.UnsupportedError)(nil); errors.As(err, &unsupportedErr) { - p.unsupportedStmtLogger.log(unsupportedErr.FeatureName, true /* isParseError */) + err := p.unsupportedStmtLogger.log(unsupportedErr.FeatureName, true /* isParseError */) + if err != nil { + return nil, err + } } continue } @@ -616,7 +619,10 @@ func readPostgresStmt( case *tree.AlterTableAddColumn: if cmd.IfNotExists { if ignoreUnsupportedStmts { - unsupportedStmtLogger.log(stmt.String(), false /* isParseError */) + err := unsupportedStmtLogger.log(stmt.String(), false /* isParseError */) + if err != nil { + return err + } continue } return errors.Errorf("unsupported statement: %s", stmt) @@ -641,7 +647,10 @@ func readPostgresStmt( } default: if ignoreUnsupportedStmts { - unsupportedStmtLogger.log(stmt.String(), false /* isParseError */) + err := unsupportedStmtLogger.log(stmt.String(), false /* isParseError */) + if err != nil { + return err + } continue } return errors.Errorf("unsupported statement: %s", stmt) @@ -649,8 +658,7 @@ func readPostgresStmt( } case *tree.AlterTableOwner: if ignoreUnsupportedStmts { - unsupportedStmtLogger.log(stmt.String(), false /* isParseError */) - return nil + return unsupportedStmtLogger.log(stmt.String(), false /* isParseError */) } return errors.Errorf("unsupported statement: %s", stmt) case *tree.CreateSequence: @@ -663,8 +671,7 @@ func readPostgresStmt( } case *tree.AlterSequence: if ignoreUnsupportedStmts { - unsupportedStmtLogger.log(stmt.String(), false /* isParseError */) - return nil + return unsupportedStmtLogger.log(stmt.String(), false /* isParseError */) } return errors.Errorf("unsupported %T statement: %s", stmt, stmt) // Some SELECT statements mutate schema. Search for those here. @@ -693,7 +700,10 @@ func readPostgresStmt( err := errors.Errorf("unsupported function call: %s in stmt: %s", expr.Func.String(), stmt.String()) if ignoreUnsupportedStmts { - unsupportedStmtLogger.log(err.Error(), false /* isParseError */) + err := unsupportedStmtLogger.log(err.Error(), false /* isParseError */) + if err != nil { + return err + } continue } return err @@ -735,7 +745,10 @@ func readPostgresStmt( default: err := errors.Errorf("unsupported %T SELECT expr: %s", expr, expr) if ignoreUnsupportedStmts { - unsupportedStmtLogger.log(err.Error(), false /* isParseError */) + err := unsupportedStmtLogger.log(err.Error(), false /* isParseError */) + if err != nil { + return err + } continue } return err @@ -744,7 +757,10 @@ func readPostgresStmt( default: err := errors.Errorf("unsupported %T SELECT %s", sel, sel) if ignoreUnsupportedStmts { - unsupportedStmtLogger.log(err.Error(), false /* isParseError */) + err := unsupportedStmtLogger.log(err.Error(), false /* isParseError */) + if err != nil { + return err + } return nil } return err @@ -790,8 +806,7 @@ func readPostgresStmt( // - ANALYZE is syntactic sugar for CreateStatistics. It can be ignored // because the auto stats stuff will pick up the changes and run if needed. if ignoreUnsupportedStmts { - unsupportedStmtLogger.log(fmt.Sprintf("%s", stmt), false /* isParseError */) - return nil + return unsupportedStmtLogger.log(fmt.Sprintf("%s", stmt), false /* isParseError */) } return errors.Errorf("unsupported %T statement: %s", stmt, stmt) case error: @@ -800,8 +815,7 @@ func readPostgresStmt( } default: if ignoreUnsupportedStmts { - unsupportedStmtLogger.log(fmt.Sprintf("%s", stmt), false /* isParseError */) - return nil + return unsupportedStmtLogger.log(fmt.Sprintf("%s", stmt), false /* isParseError */) } return errors.Errorf("unsupported %T statement: %s", stmt, stmt) } @@ -845,6 +859,7 @@ type pgDumpReader struct { opts roachpb.PgDumpOptions walltime int64 colMap map[*row.DatumRowConverter](map[string]int) + jobID int64 unsupportedStmtLogger *unsupportedStmtLogger evalCtx *tree.EvalContext } @@ -854,6 +869,7 @@ var _ inputConverter = &pgDumpReader{} // newPgDumpReader creates a new inputConverter for pg_dump files. func newPgDumpReader( ctx context.Context, + jobID int64, kvCh chan row.KVBatch, opts roachpb.PgDumpOptions, walltime int64, @@ -895,6 +911,7 @@ func newPgDumpReader( opts: opts, walltime: walltime, colMap: colMap, + jobID: jobID, evalCtx: evalCtx, }, nil } @@ -911,15 +928,16 @@ func (m *pgDumpReader) readFiles( user security.SQLUsername, ) error { // Setup logger to handle unsupported DML statements seen in the PGDUMP file. - m.unsupportedStmtLogger = makeUnsupportedStmtLogger(format.PgDump.IgnoreUnsupported, - format.PgDump.IgnoreUnsupportedLog, dataIngestion, makeExternalStorage) + m.unsupportedStmtLogger = makeUnsupportedStmtLogger(ctx, user, + m.jobID, format.PgDump.IgnoreUnsupported, format.PgDump.IgnoreUnsupportedLog, dataIngestion, + makeExternalStorage) err := readInputFiles(ctx, dataFiles, resumePos, format, m.readFile, makeExternalStorage, user) if err != nil { return err } - return m.unsupportedStmtLogger.flush(ctx, user) + return m.unsupportedStmtLogger.flush() } func (m *pgDumpReader) readFile( @@ -976,7 +994,10 @@ func (m *pgDumpReader) readFile( if m.unsupportedStmtLogger.ignoreUnsupported { logLine := fmt.Sprintf("%s: unsupported by IMPORT\n", i.Rows.Select.String()) - m.unsupportedStmtLogger.log(logLine, false /* isParseError */) + err := m.unsupportedStmtLogger.log(logLine, false /* isParseError */) + if err != nil { + return err + } continue } return errors.Errorf("unsupported: %s", i.Rows.Select) @@ -1127,7 +1148,10 @@ func (m *pgDumpReader) readFile( if !ok { err := errors.Errorf("unsupported %T Select: %v", i.Select, i.Select) if m.unsupportedStmtLogger.ignoreUnsupported { - m.unsupportedStmtLogger.log(err.Error(), false /* isParseError */) + err := m.unsupportedStmtLogger.log(err.Error(), false /* isParseError */) + if err != nil { + return err + } continue } return err @@ -1135,7 +1159,10 @@ func (m *pgDumpReader) readFile( if len(sc.Exprs) != 1 { err := errors.Errorf("unsupported %d select args: %v", len(sc.Exprs), sc.Exprs) if m.unsupportedStmtLogger.ignoreUnsupported { - m.unsupportedStmtLogger.log(err.Error(), false /* isParseError */) + err := m.unsupportedStmtLogger.log(err.Error(), false /* isParseError */) + if err != nil { + return err + } continue } return err @@ -1144,7 +1171,10 @@ func (m *pgDumpReader) readFile( if !ok { err := errors.Errorf("unsupported select arg %T: %v", sc.Exprs[0].Expr, sc.Exprs[0].Expr) if m.unsupportedStmtLogger.ignoreUnsupported { - m.unsupportedStmtLogger.log(err.Error(), false /* isParseError */) + err := m.unsupportedStmtLogger.log(err.Error(), false /* isParseError */) + if err != nil { + return err + } continue } return err @@ -1154,7 +1184,10 @@ func (m *pgDumpReader) readFile( case "search_path", "pg_catalog.set_config": err := errors.Errorf("unsupported %d fn args in select: %v", len(fn.Exprs), fn.Exprs) if m.unsupportedStmtLogger.ignoreUnsupported { - m.unsupportedStmtLogger.log(err.Error(), false /* isParseError */) + err := m.unsupportedStmtLogger.log(err.Error(), false /* isParseError */) + if err != nil { + return err + } continue } return err @@ -1162,7 +1195,10 @@ func (m *pgDumpReader) readFile( if args := len(fn.Exprs); args < 2 || args > 3 { err := errors.Errorf("unsupported %d fn args in select: %v", len(fn.Exprs), fn.Exprs) if m.unsupportedStmtLogger.ignoreUnsupported { - m.unsupportedStmtLogger.log(err.Error(), false /* isParseError */) + err := m.unsupportedStmtLogger.log(err.Error(), false /* isParseError */) + if err != nil { + return err + } continue } return err @@ -1183,7 +1219,10 @@ func (m *pgDumpReader) readFile( if !ok { err := errors.Errorf("unsupported setval %T arg: %v", fn.Exprs[1], fn.Exprs[1]) if m.unsupportedStmtLogger.ignoreUnsupported { - m.unsupportedStmtLogger.log(err.Error(), false /* isParseError */) + err := m.unsupportedStmtLogger.log(err.Error(), false /* isParseError */) + if err != nil { + return err + } continue } return err @@ -1198,7 +1237,10 @@ func (m *pgDumpReader) readFile( if !ok { err := errors.Errorf("unsupported setval %T arg: %v", fn.Exprs[2], fn.Exprs[2]) if m.unsupportedStmtLogger.ignoreUnsupported { - m.unsupportedStmtLogger.log(err.Error(), false /* isParseError */) + err := m.unsupportedStmtLogger.log(err.Error(), false /* isParseError */) + if err != nil { + return err + } continue } return err @@ -1232,7 +1274,10 @@ func (m *pgDumpReader) readFile( default: err := errors.Errorf("unsupported function %s in stmt %s", funcName, i.Select.String()) if m.unsupportedStmtLogger.ignoreUnsupported { - m.unsupportedStmtLogger.log(err.Error(), false /* isParseError */) + err := m.unsupportedStmtLogger.log(err.Error(), false /* isParseError */) + if err != nil { + return err + } continue } return err @@ -1248,7 +1293,10 @@ func (m *pgDumpReader) readFile( default: err := errors.Errorf("unsupported %T statement: %v", i, i) if m.unsupportedStmtLogger.ignoreUnsupported { - m.unsupportedStmtLogger.log(err.Error(), false /* isParseError */) + err := m.unsupportedStmtLogger.log(err.Error(), false /* isParseError */) + if err != nil { + return err + } continue } return err