Skip to content

Commit

Permalink
Merge #62228
Browse files Browse the repository at this point in the history
62228: importccl: log all ignored pgdump stmts in different log files r=miretskiy a=adityamaru

Previously, we would only log the first 10 ignored stmts, and skip over
logging the rest. This was done due to concerns about memory usage when
buffering, as ExternalStorage does not support streaming writes (yet).
This change writes a new file every time we have buffered 10 ignored
stmts. The files are written in the subdirectory
`import<jobID>/(unsupported_schema_stmts|unsupported_data_stmts)/<filenum>.log`

This allows for several imports to also log unsupported stmts to the
same user provided location without stepping on each others toes.

Release note (bug fix): log all unsupported pgdump statements across
smaller log files that can be found in the subdir
`import<jobID>/(unsupported_schema_stmts|unsupported_data_stmts)/<filenum>.log`

Co-authored-by: Aditya Maru <[email protected]>
  • Loading branch information
craig[bot] and adityamaru committed Mar 19, 2021
2 parents 764eb00 + e1dd595 commit 24e76d3
Show file tree
Hide file tree
Showing 4 changed files with 175 additions and 88 deletions.
3 changes: 2 additions & 1 deletion pkg/ccl/importccl/import_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,8 @@ func makeInputConverter(
return newPgCopyReader(spec.Format.PgCopy, kvCh, spec.WalltimeNanos,
int(spec.ReaderParallelism), singleTable, singleTableTargetCols, evalCtx)
case roachpb.IOFileFormat_PgDump:
return newPgDumpReader(ctx, kvCh, spec.Format.PgDump, spec.WalltimeNanos, spec.Tables, evalCtx)
return newPgDumpReader(ctx, int64(spec.Progress.JobID), kvCh, spec.Format.PgDump,
spec.WalltimeNanos, spec.Tables, evalCtx)
case roachpb.IOFileFormat_Avro:
return newAvroInputReader(
kvCh, singleTable, spec.Format.Avro, spec.WalltimeNanos,
Expand Down
92 changes: 59 additions & 33 deletions pkg/ccl/importccl/import_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"io/ioutil"
"math"
"net/url"
"path"
"sort"
"strconv"
"strings"
Expand Down Expand Up @@ -108,7 +109,6 @@ const (
pgDumpIgnoreShuntFileDest = "log_ignored_statements"
pgDumpUnsupportedSchemaStmtLog = "unsupported_schema_stmts"
pgDumpUnsupportedDataStmtLog = "unsupported_data_stmts"
pgDumpMaxLoggedStmts = 10

// RunningStatusImportBundleParseSchema indicates to the user that a bundle format
// schema is being parsed
Expand Down Expand Up @@ -150,6 +150,16 @@ var importOptionExpectValues = map[string]sql.KVStringOptValidate{
pgDumpIgnoreShuntFileDest: sql.KVStringOptRequireValue,
}

var pgDumpMaxLoggedStmts = 1024

func testingSetMaxLogIgnoredImportStatements(maxLogSize int) (cleanup func()) {
prevLogSize := pgDumpMaxLoggedStmts
pgDumpMaxLoggedStmts = maxLogSize
return func() {
pgDumpMaxLoggedStmts = prevLogSize
}
}

func makeStringSet(opts ...string) map[string]struct{} {
res := make(map[string]struct{}, len(opts))
for _, opt := range opts {
Expand Down Expand Up @@ -1572,6 +1582,10 @@ const (
// unsupportedStmtLogger is responsible for handling unsupported PGDUMP SQL
// statements seen during the import.
type unsupportedStmtLogger struct {
ctx context.Context
user security.SQLUsername
jobID int64

// Values are initialized based on the options specified in the IMPORT PGDUMP
// stmt.
ignoreUnsupported bool
Expand All @@ -1582,79 +1596,88 @@ type unsupportedStmtLogger struct {
logBuffer *bytes.Buffer
numIgnoredStmts int

// Incremented every time the logger flushes. It is used as the suffix of the
// log file written to external storage.
flushCount int

loggerType loggerKind
}

func makeUnsupportedStmtLogger(
ctx context.Context,
user security.SQLUsername,
jobID int64,
ignoreUnsupported bool,
unsupportedLogDest string,
loggerType loggerKind,
externalStorage cloud.ExternalStorageFactory,
) *unsupportedStmtLogger {
l := &unsupportedStmtLogger{
return &unsupportedStmtLogger{
ctx: ctx,
user: user,
jobID: jobID,
ignoreUnsupported: ignoreUnsupported,
ignoreUnsupportedLogDest: unsupportedLogDest,
loggerType: loggerType,
logBuffer: new(bytes.Buffer),
externalStorage: externalStorage,
}
header := "Unsupported statements during schema parse phase:\n\n"
if loggerType == dataIngestion {
header = "Unsupported statements during data ingestion phase:\n\n"
}
l.logBuffer.WriteString(header)
return l
}

func (u *unsupportedStmtLogger) log(logLine string, isParseError bool) {
func (u *unsupportedStmtLogger) log(logLine string, isParseError bool) error {
// We have already logged parse errors during the schema ingestion phase, so
// skip them to avoid duplicate entries.
skipLoggingParseErr := isParseError && u.loggerType == dataIngestion
if u.ignoreUnsupportedLogDest == "" || skipLoggingParseErr {
return
return nil
}

if u.numIgnoredStmts < pgDumpMaxLoggedStmts {
if isParseError {
logLine = fmt.Sprintf("%s: could not be parsed\n", logLine)
} else {
logLine = fmt.Sprintf("%s: unsupported by IMPORT\n", logLine)
// Flush to a file if we have hit the max size of our buffer.
if u.numIgnoredStmts >= pgDumpMaxLoggedStmts {
err := u.flush()
if err != nil {
return err
}
u.logBuffer.Write([]byte(logLine))
}

if isParseError {
logLine = fmt.Sprintf("%s: could not be parsed\n", logLine)
} else {
logLine = fmt.Sprintf("%s: unsupported by IMPORT\n", logLine)
}
u.logBuffer.Write([]byte(logLine))
u.numIgnoredStmts++
return nil
}

func (u *unsupportedStmtLogger) flush(ctx context.Context, user security.SQLUsername) error {
func (u *unsupportedStmtLogger) flush() error {
if u.ignoreUnsupportedLogDest == "" {
return nil
}

numLoggedStmts := pgDumpMaxLoggedStmts
if u.numIgnoredStmts < pgDumpMaxLoggedStmts {
numLoggedStmts = u.numIgnoredStmts
}
u.logBuffer.WriteString(fmt.Sprintf("\nLogging %d out of %d ignored statements.\n",
numLoggedStmts, u.numIgnoredStmts))

conf, err := cloudimpl.ExternalStorageConfFromURI(u.ignoreUnsupportedLogDest, user)
conf, err := cloudimpl.ExternalStorageConfFromURI(u.ignoreUnsupportedLogDest, u.user)
if err != nil {
return errors.Wrap(err, "failed to log unsupported stmts during IMPORT PGDUMP")
}
var s cloud.ExternalStorage
if s, err = u.externalStorage(ctx, conf); err != nil {
if s, err = u.externalStorage(u.ctx, conf); err != nil {
return errors.New("failed to log unsupported stmts during IMPORT PGDUMP")
}
defer s.Close()

logFileName := pgDumpUnsupportedSchemaStmtLog
logFileName := fmt.Sprintf("import%d", u.jobID)
if u.loggerType == dataIngestion {
logFileName = pgDumpUnsupportedDataStmtLog
logFileName = path.Join(logFileName, pgDumpUnsupportedDataStmtLog, fmt.Sprintf("%d.log", u.flushCount))
} else {
logFileName = path.Join(logFileName, pgDumpUnsupportedSchemaStmtLog, fmt.Sprintf("%d.log", u.flushCount))
}
err = s.WriteFile(ctx, logFileName, bytes.NewReader(u.logBuffer.Bytes()))
err = s.WriteFile(u.ctx, logFileName, bytes.NewReader(u.logBuffer.Bytes()))
if err != nil {
return errors.Wrap(err, "failed to log unsupported stmts to log during IMPORT PGDUMP")
}
u.flushCount++
u.numIgnoredStmts = 0
u.logBuffer.Truncate(0)
return nil
}

Expand All @@ -1671,6 +1694,7 @@ func parseAndCreateBundleTableDescs(
format roachpb.IOFileFormat,
walltime int64,
owner security.SQLUsername,
jobID jobspb.JobID,
) ([]*tabledesc.Mutable, []*schemadesc.Mutable, error) {

var schemaDescs []*schemadesc.Mutable
Expand Down Expand Up @@ -1714,13 +1738,14 @@ func parseAndCreateBundleTableDescs(
evalCtx := &p.ExtendedEvalContext().EvalContext

// Setup a logger to handle unsupported DDL statements in the PGDUMP file.
unsupportedStmtLogger := makeUnsupportedStmtLogger(format.PgDump.IgnoreUnsupported,
format.PgDump.IgnoreUnsupportedLog, schemaParsing, p.ExecCfg().DistSQLSrv.ExternalStorage)
unsupportedStmtLogger := makeUnsupportedStmtLogger(ctx, p.User(), int64(jobID),
format.PgDump.IgnoreUnsupported, format.PgDump.IgnoreUnsupportedLog, schemaParsing,
p.ExecCfg().DistSQLSrv.ExternalStorage)

tableDescs, schemaDescs, err = readPostgresCreateTable(ctx, reader, evalCtx, p, tableName,
parentID, walltime, fks, int(format.PgDump.MaxRowSize), owner, unsupportedStmtLogger)

logErr := unsupportedStmtLogger.flush(ctx, p.User())
logErr := unsupportedStmtLogger.flush()
if logErr != nil {
return nil, nil, logErr
}
Expand Down Expand Up @@ -1765,7 +1790,8 @@ func (r *importResumer) parseBundleSchemaIfNeeded(ctx context.Context, phs inter
walltime := p.ExecCfg().Clock.Now().WallTime

if tableDescs, schemaDescs, err = parseAndCreateBundleTableDescs(
ctx, p, details, seqVals, skipFKs, parentID, files, format, walltime, owner); err != nil {
ctx, p, details, seqVals, skipFKs, parentID, files, format, walltime, owner,
r.job.ID()); err != nil {
return err
}

Expand Down
66 changes: 39 additions & 27 deletions pkg/ccl/importccl/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"net/http"
"net/http/httptest"
"net/url"
"path"
"path/filepath"
"regexp"
"strings"
Expand Down Expand Up @@ -5788,7 +5789,7 @@ func TestImportPgDumpIgnoredStmts(t *testing.T) {
t.Run("ignore-unsupported", func(t *testing.T) {
sqlDB.Exec(t, "CREATE DATABASE foo; USE foo;")
sqlDB.Exec(t, "IMPORT PGDUMP ($1) WITH ignore_unsupported_statements", srv.URL)
// Check that statements which are not expected to be ignored, are still
// Check that statements that are not expected to be ignored, are still
// processed.
sqlDB.CheckQueryResults(t, "SELECT * FROM foo", [][]string{{"1"}, {"2"}, {"3"}})
})
Expand All @@ -5800,16 +5801,20 @@ func TestImportPgDumpIgnoredStmts(t *testing.T) {

t.Run("require-both-unsupported-options", func(t *testing.T) {
sqlDB.Exec(t, "CREATE DATABASE foo2; USE foo2;")
ignoredLog := `userfile:///ignore.log`
ignoredLog := `userfile:///ignore`
sqlDB.ExpectErr(t, "cannot log unsupported PGDUMP stmts without `ignore_unsupported_statements` option",
"IMPORT PGDUMP ($1) WITH log_ignored_statements=$2", srv.URL, ignoredLog)
})

t.Run("log-unsupported-stmts", func(t *testing.T) {
sqlDB.Exec(t, "CREATE DATABASE foo3; USE foo3;")
ignoredLog := `userfile:///ignore.log`
sqlDB.Exec(t, "IMPORT PGDUMP ($1) WITH ignore_unsupported_statements, log_ignored_statements=$2",
srv.URL, ignoredLog)
ignoredLog := `userfile:///ignore`
defer testingSetMaxLogIgnoredImportStatements(10 /* maxLogSize */)()
var importJobID int
var unused interface{}
sqlDB.QueryRow(t, "IMPORT PGDUMP ($1) WITH ignore_unsupported_statements, "+
"log_ignored_statements=$2", srv.URL, ignoredLog).Scan(&importJobID, &unused, &unused,
&unused, &unused, &unused)
// Check that statements which are not expected to be ignored, are still
// processed.
sqlDB.CheckQueryResults(t, "SELECT * FROM foo", [][]string{{"1"}, {"2"}, {"3"}})
Expand All @@ -5823,13 +5828,24 @@ func TestImportPgDumpIgnoredStmts(t *testing.T) {
tc.Servers[0].InternalExecutor().(*sql.InternalExecutor), tc.Servers[0].DB())
require.NoError(t, err)
defer store.Close()
content, err := store.ReadFile(ctx, pgDumpUnsupportedSchemaStmtLog)
require.NoError(t, err)
descBytes, err := ioutil.ReadAll(content)
require.NoError(t, err)
expectedSchemaLog := `Unsupported statements during schema parse phase:

create trigger: could not be parsed
// We expect there to be two log files since we have 13 unsupported statements.
dirName := fmt.Sprintf("import%d", importJobID)
checkFiles := func(expectedFileContent []string, logSubdir string) {
files, err := store.ListFiles(ctx, fmt.Sprintf("*/%s/*", logSubdir))
require.NoError(t, err)
for i, file := range files {
require.Equal(t, file, path.Join(dirName, logSubdir, fmt.Sprintf("%d.log", i)))
content, err := store.ReadFile(ctx, file)
require.NoError(t, err)
descBytes, err := ioutil.ReadAll(content)
require.NoError(t, err)
require.Equal(t, []byte(expectedFileContent[i]), descBytes)
}
}

schemaFileContents := []string{
`create trigger: could not be parsed
revoke privileges on sequence: could not be parsed
revoke privileges on sequence: could not be parsed
grant privileges on sequence: could not be parsed
Expand All @@ -5839,24 +5855,20 @@ create extension if not exists with: could not be parsed
create function: could not be parsed
alter function: could not be parsed
COMMENT ON TABLE t IS 'This should be skipped': unsupported by IMPORT
`,
`COMMENT ON DATABASE t IS 'This should be skipped': unsupported by IMPORT
COMMENT ON COLUMN t IS 'This should be skipped': unsupported by IMPORT
unsupported function call: set_config in stmt: SELECT set_config('search_path', '', false): unsupported by IMPORT
`,
}
checkFiles(schemaFileContents, pgDumpUnsupportedSchemaStmtLog)

Logging 10 out of 13 ignored statements.
`
require.Equal(t, []byte(expectedSchemaLog), descBytes)

expectedDataLog := `Unsupported statements during data ingestion phase:
unsupported 3 fn args in select: ['search_path' '' false]: unsupported by IMPORT
ingestionFileContents := []string{
`unsupported 3 fn args in select: ['search_path' '' false]: unsupported by IMPORT
unsupported *tree.Delete statement: DELETE FROM geometry_columns WHERE (f_table_name = 'nyc_census_blocks') AND (f_table_schema = 'public'): unsupported by IMPORT
Logging 2 out of 2 ignored statements.
`

content, err = store.ReadFile(ctx, pgDumpUnsupportedDataStmtLog)
require.NoError(t, err)
descBytes, err = ioutil.ReadAll(content)
require.NoError(t, err)
require.Equal(t, []byte(expectedDataLog), descBytes)
`,
}
checkFiles(ingestionFileContents, pgDumpUnsupportedDataStmtLog)
})
}

Expand Down
Loading

0 comments on commit 24e76d3

Please sign in to comment.