Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
57827: importccl: cleanup how we ignore stmts in IMPORT PGDUMP r=pbardea,miretskiy a=adityamaru

There are two kinds of unsupported statements when processing an IMPORT
PGDUMP:
- Unparseable
- Parseable, but unsupported

Previously, we had a set of regex statements to skip over statements
which fell into the first category. Those which fell into the second
category were skipped when we saw an AST Node of that kind.

This change does not ignore any additional statements, but removes the
regex. It does this by leaning on existing yacc grammar rules, or adding
new "unsupported" rules. These unsupported rules raise a special kind of
error which we catch during IMPORT PGDUMP, and is decorated with
required information.

This change also introduces a `ignore_unsupported` flag to IMPORT PGDUMP
which will allow users to skip all the stmts (of both categories
mentioned above). If not set, we will fail with either a parse or
unsupported stmt error.

Release note (sql change): New IMPORT PGDUMP option `ignore_unsupported`
to skip over all the unsupported PGDUMP stmts. The collection of these
statements will be appropriately documented.

59716: storage: limit RevertRange batches to 32mb r=dt a=dt

The existing limit in key-count/span-count can produce batches in excess of 64mb,
if, for example, they have very large keys. These batches then are rejected for
exceeding the raft command size limit.

This adds an aditional hard-coded limit of 32mb on the write batch to which keys
or spans to clear are added (if the command is executed against a non-Batch the
limit is ignored). The size of the batch is re-checked once every 32 keys.

Release note (bug fix): avoid creating batches that exceed the raft command limit (64mb)  when reverting ranges that contain very large keys.

60466: sql: test namespace entry for multi-region enum is drained properly r=arulajmani a=arulajmani

Testing only patch. When the primary region is removed from a
multi-region database we queue up an async job to reclaim the namespace
entry for the multi-region type descriptor (and its array alias). This
patch adds a test to make sure that the namespace entry is drained
properly even if the job runs into an error.

This patch adds a new testing knob to the type schema changer,
RunAfterAllFailOrCancel, to synch on the `OnFailOrCancel` job
completing.

Release note: None

60499: storage: unsafe key mangling for MVCCIterator r=sumeerbhola a=sumeerbhola

It can uncover bugs in code that misuses unsafe
keys. It uncovered the bug fixed in
https://github.com/cockroachdb/cockroach/pull/60460/files#diff-84108c53fd1e766ef8802b87b394981d3497d87c40d86e084f2ed77bba0ca71a

Release note: None

60560: opt: cast to identical types for set operations r=RaduBerinde a=RaduBerinde

This change makes the optbuilder more strict when building set
operations. Previously, it could build expressions which have
corresponding left/right types which are `Equivalent()`, but not
`Identical()`. This leads to errors in vectorized execution, when we
e.g. try to union a INT8 with an INT4.

We now make the types on both sides `Identical()`, adding casts as
necessary. We try to do a best-effort attempt to use the larger
numeric type when possible (e.g. int4->int8, int->float, float->decimal).

Fixes #59148.

Release note (bug fix): fixed execution errors for some queries that
use set operations (UNION / EXCEPT / INTERSECT) where a column has
types of different widths on the two sides (e.g. INT4 vs INT8).

60601: opt: add test constraining partial index on virtual column r=mgartner a=mgartner

No code changes were necessary in order to generate a constrained scan
for a partial index on a virtual column.

Release note: None

60659: sql: fix dangling zone config on REGIONAL BY ROW -> REGIONAL BY TABLE r=ajstorm a=otan

We previously did not set NumReplicas to 0 for RBR -> RBT conversions.
This means the zone configuration do not cleanup and inherit properly
after the async job completes for dropping old indexes since it no
longer thinks the RBR zone config is a placeholder, leaving a
dangling reference. Patch this up correctly.

N.B.: In theory there's nothing wrong with this. But this patch makes
SHOW ZONE CONFIGURATION consistent when you convert RBR to RBT
compared to making a RBT table directly.

Release note: None



Co-authored-by: Aditya Maru <[email protected]>
Co-authored-by: David Taylor <[email protected]>
Co-authored-by: arulajmani <[email protected]>
Co-authored-by: sumeerbhola <[email protected]>
Co-authored-by: Radu Berinde <[email protected]>
Co-authored-by: Marcus Gartner <[email protected]>
Co-authored-by: Oliver Tan <[email protected]>
  • Loading branch information
8 people committed Feb 18, 2021
8 parents d67d6cc + 42f76d6 + 89fcd2c + df14643 + 9db3dc3 + 4037bd8 + 2d2c867 + 718034b commit 40a35fe
Show file tree
Hide file tree
Showing 32 changed files with 1,478 additions and 378 deletions.
6 changes: 3 additions & 3 deletions pkg/ccl/importccl/import_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,7 @@ func TestConverterFlushesBatches(t *testing.T) {
}

kvCh := make(chan row.KVBatch, batchSize)
conv, err := makeInputConverter(ctx, converterSpec, &evalCtx, kvCh,
nil /* seqChunkProvider */)
conv, err := makeInputConverter(ctx, converterSpec, &evalCtx, kvCh, nil /* seqChunkProvider */)
if err != nil {
t.Fatalf("makeInputConverter() error = %v", err)
}
Expand Down Expand Up @@ -953,7 +952,8 @@ func pgDumpFormat() roachpb.IOFileFormat {
return roachpb.IOFileFormat{
Format: roachpb.IOFileFormat_PgDump,
PgDump: roachpb.PgDumpOptions{
MaxRowSize: 64 * 1024,
MaxRowSize: 64 * 1024,
IgnoreUnsupported: true,
},
}
}
Expand Down
133 changes: 131 additions & 2 deletions pkg/ccl/importccl/import_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package importccl

import (
"bytes"
"context"
"fmt"
"io/ioutil"
Expand Down Expand Up @@ -100,6 +101,12 @@ const (
avroSchema = "schema"
avroSchemaURI = "schema_uri"

pgDumpIgnoreAllUnsupported = "ignore_unsupported"
pgDumpIgnoreShuntFileDest = "ignored_stmt_log"
pgDumpUnsupportedSchemaStmtLog = "unsupported_schema_stmts"
pgDumpUnsupportedDataStmtLog = "unsupported_data-_stmts"
pgDumpMaxLoggedStmts = 10

// RunningStatusImportBundleParseSchema indicates to the user that a bundle format
// schema is being parsed
runningStatusImportBundleParseSchema jobs.RunningStatus = "parsing schema on Import Bundle"
Expand Down Expand Up @@ -135,6 +142,9 @@ var importOptionExpectValues = map[string]sql.KVStringOptValidate{
avroRecordsSeparatedBy: sql.KVStringOptRequireValue,
avroBinRecords: sql.KVStringOptRequireNoValue,
avroJSONRecords: sql.KVStringOptRequireNoValue,

pgDumpIgnoreAllUnsupported: sql.KVStringOptRequireNoValue,
pgDumpIgnoreShuntFileDest: sql.KVStringOptRequireValue,
}

func makeStringSet(opts ...string) map[string]struct{} {
Expand Down Expand Up @@ -164,7 +174,8 @@ var mysqlOutAllowedOptions = makeStringSet(
)
var mysqlDumpAllowedOptions = makeStringSet(importOptionSkipFKs, csvRowLimit)
var pgCopyAllowedOptions = makeStringSet(pgCopyDelimiter, pgCopyNull, optMaxRowSize)
var pgDumpAllowedOptions = makeStringSet(optMaxRowSize, importOptionSkipFKs, csvRowLimit)
var pgDumpAllowedOptions = makeStringSet(optMaxRowSize, importOptionSkipFKs, csvRowLimit,
pgDumpIgnoreAllUnsupported, pgDumpIgnoreShuntFileDest)

// DROP is required because the target table needs to be take offline during
// IMPORT INTO.
Expand Down Expand Up @@ -615,6 +626,16 @@ func importPlanHook(
maxRowSize = int32(sz)
}
format.PgDump.MaxRowSize = maxRowSize
if _, ok := opts[pgDumpIgnoreAllUnsupported]; ok {
format.PgDump.IgnoreUnsupported = true
}

if dest, ok := opts[pgDumpIgnoreShuntFileDest]; ok {
if !format.PgDump.IgnoreUnsupported {
return errors.New("cannot log unsupported PGDUMP stmts without `ignore_unsupported` option")
}
format.PgDump.IgnoreUnsupportedLog = dest
}

if override, ok := opts[csvRowLimit]; ok {
rowLimit, err := strconv.Atoi(override)
Expand Down Expand Up @@ -1295,6 +1316,102 @@ func (r *importResumer) ReportResults(ctx context.Context, resultsCh chan<- tree
}
}

type loggerKind int

const (
schemaParsing loggerKind = iota
dataIngestion
)

// unsupportedStmtLogger is responsible for handling unsupported PGDUMP SQL
// statements seen during the import.
type unsupportedStmtLogger struct {
// Values are initialized based on the options specified in the IMPORT PGDUMP
// stmt.
ignoreUnsupported bool
ignoreUnsupportedLogDest string
externalStorage cloud.ExternalStorageFactory

// logBuffer holds the string to be flushed to the ignoreUnsupportedLogDest.
logBuffer *bytes.Buffer
numIgnoredStmts int

loggerType loggerKind
}

func makeUnsupportedStmtLogger(
ignoreUnsupported bool,
unsupportedLogDest string,
loggerType loggerKind,
externalStorage cloud.ExternalStorageFactory,
) *unsupportedStmtLogger {
l := &unsupportedStmtLogger{
ignoreUnsupported: ignoreUnsupported,
ignoreUnsupportedLogDest: unsupportedLogDest,
loggerType: loggerType,
logBuffer: new(bytes.Buffer),
externalStorage: externalStorage,
}
header := "Unsupported statements during schema parse phase:\n\n"
if loggerType == dataIngestion {
header = "Unsupported statements during data ingestion phase:\n\n"
}
l.logBuffer.WriteString(header)
return l
}

func (u *unsupportedStmtLogger) log(logLine string, isParseError bool) {
// We have already logged parse errors during the schema ingestion phase, so
// skip them to avoid duplicate entries.
skipLoggingParseErr := isParseError && u.loggerType == dataIngestion
if u.ignoreUnsupportedLogDest == "" || skipLoggingParseErr {
return
}

if u.numIgnoredStmts < pgDumpMaxLoggedStmts {
if isParseError {
logLine = fmt.Sprintf("%s: could not be parsed\n", logLine)
} else {
logLine = fmt.Sprintf("%s: unsupported by IMPORT\n", logLine)
}
u.logBuffer.Write([]byte(logLine))
}
u.numIgnoredStmts++
}

func (u *unsupportedStmtLogger) flush(ctx context.Context, user security.SQLUsername) error {
if u.ignoreUnsupportedLogDest == "" {
return nil
}

numLoggedStmts := pgDumpMaxLoggedStmts
if u.numIgnoredStmts < pgDumpMaxLoggedStmts {
numLoggedStmts = u.numIgnoredStmts
}
u.logBuffer.WriteString(fmt.Sprintf("\nLogging %d out of %d ignored statements.\n",
numLoggedStmts, u.numIgnoredStmts))

conf, err := cloudimpl.ExternalStorageConfFromURI(u.ignoreUnsupportedLogDest, user)
if err != nil {
return errors.Wrap(err, "failed to log unsupported stmts during IMPORT PGDUMP")
}
var s cloud.ExternalStorage
if s, err = u.externalStorage(ctx, conf); err != nil {
return errors.New("failed to log unsupported stmts during IMPORT PGDUMP")
}
defer s.Close()

logFileName := pgDumpUnsupportedSchemaStmtLog
if u.loggerType == dataIngestion {
logFileName = pgDumpUnsupportedDataStmtLog
}
err = s.WriteFile(ctx, logFileName, bytes.NewReader(u.logBuffer.Bytes()))
if err != nil {
return errors.Wrap(err, "failed to log unsupported stmts to log during IMPORT PGDUMP")
}
return nil
}

// parseAndCreateBundleTableDescs parses and creates the table
// descriptors for bundle formats.
func parseAndCreateBundleTableDescs(
Expand Down Expand Up @@ -1344,7 +1461,19 @@ func parseAndCreateBundleTableDescs(
tableDescs, err = readMysqlCreateTable(ctx, reader, evalCtx, p, defaultCSVTableID, parentID, tableName, fks, seqVals, owner, walltime)
case roachpb.IOFileFormat_PgDump:
evalCtx := &p.ExtendedEvalContext().EvalContext
tableDescs, err = readPostgresCreateTable(ctx, reader, evalCtx, p, tableName, parentID, walltime, fks, int(format.PgDump.MaxRowSize), owner)

// Setup a logger to handle unsupported DDL statements in the PGDUMP file.
unsupportedStmtLogger := makeUnsupportedStmtLogger(format.PgDump.IgnoreUnsupported,
format.PgDump.IgnoreUnsupportedLog, schemaParsing, p.ExecCfg().DistSQLSrv.ExternalStorage)

tableDescs, err = readPostgresCreateTable(ctx, reader, evalCtx, p, tableName, parentID,
walltime, fks, int(format.PgDump.MaxRowSize), owner, unsupportedStmtLogger)

logErr := unsupportedStmtLogger.flush(ctx, p.User())
if logErr != nil {
return nil, logErr
}

default:
return tableDescs, errors.Errorf("non-bundle format %q does not support reading schemas", format.Format.String())
}
Expand Down
Loading

0 comments on commit 40a35fe

Please sign in to comment.