From 2af7e56e949d3ff8507089b85da0550b487e313f Mon Sep 17 00:00:00 2001 From: anzoteh96 Date: Mon, 27 Jul 2020 14:27:48 -0400 Subject: [PATCH] changes as per review --- pkg/ccl/importccl/import_stmt_test.go | 11 ++- pkg/ccl/importccl/read_import_base.go | 2 +- pkg/ccl/importccl/read_import_pgdump.go | 2 +- pkg/sql/row/row_converter.go | 108 +++++++++++++----------- 4 files changed, 69 insertions(+), 54 deletions(-) diff --git a/pkg/ccl/importccl/import_stmt_test.go b/pkg/ccl/importccl/import_stmt_test.go index a41f90fa65cd..f0aa6a2fc4e6 100644 --- a/pkg/ccl/importccl/import_stmt_test.go +++ b/pkg/ccl/importccl/import_stmt_test.go @@ -3263,6 +3263,11 @@ func TestImportDefault(t *testing.T) { rowIDCols: []string{selectNotNull("a")}, }, { + // unique_rowid()+unique_rowid() won't work as the rowid produced by import + // has its leftmost bit set to 1, and adding them causes overflow. A way to + // get around is to have each unique_rowid() modulo a number, M. Here M = 1e9+7 + // is used 1e9+7 here given that it's big enough and is a prime, which is + // generally effective in avoiding collisions. name: "rowid+rowid", create: fmt.Sprintf( `a INT DEFAULT (unique_rowid() %% %d) + (unique_rowid() %% %d), b INT PRIMARY KEY, c STRING`, M, M), @@ -4430,14 +4435,12 @@ func TestImportPgDumpGeo(t *testing.T) { // Verify both created tables are identical. importCreate := sqlDB.QueryStr(t, "SELECT create_statement FROM [SHOW CREATE importdb.nyc_census_blocks]") - // Families are slightly different due to that rowid shows up in exec - // but not import (possibly due to the ALTER TABLE statement that makes + // Families are slightly different due to rowid showing up in exec but + // not import (possibly due to the ALTER TABLE statement that makes // gid a primary key), so add that into import to match exec. importCreate[0][0] = strings.Replace(importCreate[0][0], "boroname, geom", "boroname, rowid, geom", 1) sqlDB.CheckQueryResults(t, "SELECT create_statement FROM [SHOW CREATE execdb.nyc_census_blocks]", importCreate) - // Drop the comparison of gid for import vs exec, then check that gid - // in import is indeed valid rowid. importCols := "blkid, popn_total, popn_white, popn_black, popn_nativ, popn_asian, popn_other, boroname" importSelect := sqlDB.QueryStr(t, fmt.Sprintf( "SELECT (%s) FROM importdb.nyc_census_blocks ORDER BY PRIMARY KEY importdb.nyc_census_blocks", diff --git a/pkg/ccl/importccl/read_import_base.go b/pkg/ccl/importccl/read_import_base.go index 1d2340b547e0..6ceb06e1b254 100644 --- a/pkg/ccl/importccl/read_import_base.go +++ b/pkg/ccl/importccl/read_import_base.go @@ -408,7 +408,7 @@ func makeDatumConverter( ctx context.Context, importCtx *parallelImportContext, fileCtx *importFileContext, ) (*row.DatumRowConverter, error) { conv, err := row.NewDatumRowConverter( - ctx, importCtx.tableDesc, importCtx.targetCols, importCtx.evalCtx.Copy(), importCtx.kvCh) + ctx, importCtx.tableDesc, importCtx.targetCols, importCtx.evalCtx, importCtx.kvCh) if err == nil { conv.KvBatch.Source = fileCtx.source } diff --git a/pkg/ccl/importccl/read_import_pgdump.go b/pkg/ccl/importccl/read_import_pgdump.go index a765b9097070..12fe406cd597 100644 --- a/pkg/ccl/importccl/read_import_pgdump.go +++ b/pkg/ccl/importccl/read_import_pgdump.go @@ -508,7 +508,7 @@ func newPgDumpReader( for i, col := range table.Desc.VisibleColumns() { colSubMap[col.Name] = i } - conv, err := row.NewDatumRowConverter(ctx, table.Desc, targetCols, evalCtx.Copy(), kvCh) + conv, err := row.NewDatumRowConverter(ctx, table.Desc, targetCols, evalCtx, kvCh) if err != nil { return nil, err } diff --git a/pkg/sql/row/row_converter.go b/pkg/sql/row/row_converter.go index b41a2f458e7b..a81724ea81c1 100644 --- a/pkg/sql/row/row_converter.go +++ b/pkg/sql/row/row_converter.go @@ -39,11 +39,21 @@ const ( cellInfoAddr tree.AnnotationIdx = iota + 1 ) -type cellInfoAnnot struct { - sourceID int32 - rowID int64 - uniqueRowIDInst int - uniqueRowIDTotal int +type cellInfoAnnotation struct { + sourceID int32 + rowID int64 + uniqueRowIDInstance int + uniqueRowIDTotal int +} + +func getCellInfoAnnotation(t *tree.Annotations) *cellInfoAnnotation { + return t.Get(cellInfoAddr).(*cellInfoAnnotation) +} + +func (c *cellInfoAnnotation) Reset(sourceID int32, rowID int64) { + c.sourceID = sourceID + c.rowID = rowID + c.uniqueRowIDInstance = 0 } // We don't want to call unique_rowid() for columns with such default expressions @@ -82,12 +92,12 @@ type cellInfoAnnot struct { // default expression unique_rowid() could cause collisions when IMPORTs are run // too close to each other. It will therefore be nice to fix this problem. func importUniqueRowID(evalCtx *tree.EvalContext, args tree.Datums) (tree.Datum, error) { - c := (*evalCtx.Annotations).Get(cellInfoAddr).(cellInfoAnnot) + c := getCellInfoAnnotation(evalCtx.Annotations) avoidCollisionsWithSQLsIDs := uint64(1 << 63) - shiftedIndex := int64(c.uniqueRowIDTotal)*c.rowID + int64(c.uniqueRowIDInst) + shiftedIndex := int64(c.uniqueRowIDTotal)*c.rowID + int64(c.uniqueRowIDInstance) returnIndex := (uint64(c.sourceID) << rowIDBits) ^ uint64(shiftedIndex) - c.uniqueRowIDInst++ - (*evalCtx.Annotations).Set(cellInfoAddr, c) + c.uniqueRowIDInstance++ + evalCtx.Annotations.Set(cellInfoAddr, c) return tree.NewDInt(tree.DInt(avoidCollisionsWithSQLsIDs | returnIndex)), nil } @@ -95,13 +105,15 @@ func importUniqueRowID(evalCtx *tree.EvalContext, args tree.Datums) (tree.Datum, // of as we walk through the expressions in a row (at datumRowConverter creation // time). This will be handled by the sideEffect field: it will be called with an // annotation passed in that changes the counter. In the case of unique_rowid, for -// example, we want to keep track of the total number of unique_rowid occurences +// example, we want to keep track of the total number of unique_rowid occurrences // in a row. type customFunc struct { - sideEffect func(annotations *tree.Annotations) - override *tree.FunctionDefinition + visitorSideEffect func(annotations *tree.Annotations) + override *tree.FunctionDefinition } +var useDefaultBuiltin *customFunc + // Given that imports can be retried and resumed, we want to // ensure that the default functions return the same value given // the same arguments, even on retries. Therfore we decide to support @@ -110,18 +122,16 @@ type customFunc struct { var supportedImportFuncOverrides = map[string]*customFunc{ // These methods can be supported given that we set the statement // and transaction timestamp to be equal, i.e. the write timestamp. - "current_date": nil, - "current_timestamp": nil, - "localtimestamp": nil, - "now": nil, - "statement_timestamp": nil, - "timeofday": nil, - "transaction_timestamp": nil, + "current_date": useDefaultBuiltin, + "current_timestamp": useDefaultBuiltin, + "localtimestamp": useDefaultBuiltin, + "now": useDefaultBuiltin, + "statement_timestamp": useDefaultBuiltin, + "timeofday": useDefaultBuiltin, + "transaction_timestamp": useDefaultBuiltin, "unique_rowid": { - sideEffect: func(annot *tree.Annotations) { - c := annot.Get(cellInfoAddr).(cellInfoAnnot) - c.uniqueRowIDTotal++ - annot.Set(cellInfoAddr, c) + visitorSideEffect: func(annot *tree.Annotations) { + getCellInfoAnnotation(annot).uniqueRowIDTotal++ }, override: makeBuiltinOverride( tree.FunDefs["unique_rowid"], @@ -158,10 +168,12 @@ func (e *unsafeErrExpr) Eval(_ *tree.EvalContext) (tree.Datum, error) { // visitor walks the tree and ensures that any expression in the tree // that's not immutable is what we explicitly support. type importDefaultExprVisitor struct { - err error - ctx context.Context - annot *tree.Annotations - semaCtx *tree.SemaContext + err error + ctx context.Context + annotations *tree.Annotations + semaCtx *tree.SemaContext + // The volatile flag will be set if there's at least one volatile + // function appearing in the default expression. volatile bool } @@ -170,6 +182,9 @@ func (v *importDefaultExprVisitor) VisitPre(expr tree.Expr) (recurse bool, newEx return v.err == nil, expr } +const overrideImmutable = false +const overrideVolatile = true + // VisitPost implements tree.Visitor interface. func (v *importDefaultExprVisitor) VisitPost(expr tree.Expr) (newExpr tree.Expr) { if v.err != nil { @@ -182,16 +197,19 @@ func (v *importDefaultExprVisitor) VisitPost(expr tree.Expr) (newExpr tree.Expr) if custom, isSafe := supportedImportFuncOverrides[resolvedFnName]; !isSafe { v.err = errors.Newf(`function %s unsupported by IMPORT INTO`, resolvedFnName) } else { - // No override exists, means it's okay to use the definitions given in builtin.go. - if custom == nil { + if custom == useDefaultBuiltin { + // No override exists, means it's okay to use the definitions given in + // builtin.go. return expr } // Override exists, so we turn the volatile flag of the visitor to true. // In addition, the sideEffect function needs to be called to update any - // relevant counter (e.g. the total number of occurrences of the unique_rowid - // function in an expression). - v.volatile = true - custom.sideEffect(v.annot) + // relevant counter (e.g. the total number of occurrences of the + // unique_rowid function in an expression). + v.volatile = overrideVolatile + if custom.visitorSideEffect != nil { + custom.visitorSideEffect(v.annotations) + } funcExpr := &tree.FuncExpr{ Func: tree.ResolvableFunctionReference{FunctionReference: custom.override}, Type: fn.Type, @@ -220,7 +238,7 @@ func SanitizeExprsForImport( typedExpr, err := sqlbase.SanitizeVarFreeExpr( ctx, expr, targetType, "import_default", &semaCtx, tree.VolatilityImmutable) if err == nil { - return typedExpr, false, nil + return typedExpr, overrideImmutable, nil } // Now that the expressions are not immutable, we first check that they // are of the correct type before checking for any unsupported functions @@ -229,7 +247,7 @@ func SanitizeExprsForImport( if err != nil { return nil, false, unsafeExpressionError(err, "type checking error", expr.String()) } - v := &importDefaultExprVisitor{annot: evalCtx.Annotations} + v := &importDefaultExprVisitor{annotations: evalCtx.Annotations} newExpr, _ := tree.WalkExpr(v, typedExpr) if v.err != nil { return nil, false, unsafeExpressionError(v.err, "expr walking error", expr.String()) @@ -448,7 +466,7 @@ func NewDatumRowConverter( c := &DatumRowConverter{ tableDesc: immutDesc, KvCh: kvCh, - EvalCtx: evalCtx, + EvalCtx: evalCtx.Copy(), } var targetColDescriptors []sqlbase.ColumnDescriptor @@ -519,7 +537,7 @@ func NewDatumRowConverter( } hidden := -1 annot := make(tree.Annotations, 1) - annot.Set(cellInfoAddr, cellInfoAnnot{uniqueRowIDInst: 0}) + annot.Set(cellInfoAddr, &cellInfoAnnotation{uniqueRowIDInstance: 0}) c.EvalCtx.Annotations = &annot for i := range cols { col := &cols[i] @@ -532,7 +550,7 @@ func NewDatumRowConverter( if col.DefaultExpr != nil { // Placeholder for columns with default values that will be evaluated when // each import row is being created. - typedExpr, volatile, err := SanitizeExprsForImport(ctx, evalCtx, defaultExprs[i], col.Type) + typedExpr, volatile, err := SanitizeExprsForImport(ctx, c.EvalCtx, defaultExprs[i], col.Type) if err != nil { // This expression may not be safe for import but we don't want to // call the user out at this stage: targeted columns may not have @@ -544,8 +562,10 @@ func NewDatumRowConverter( } } else { c.defaultCache[i] = typedExpr + // This default expression isn't volatile, so we can evaluate once here + // and memoize it. if !volatile { - c.defaultCache[i], err = c.defaultCache[i].Eval(evalCtx) + c.defaultCache[i], err = c.defaultCache[i].Eval(c.EvalCtx) if err != nil { return nil, errors.Wrapf(err, "error evaluating default expression") } @@ -573,14 +593,6 @@ func NewDatumRowConverter( const rowIDBits = 64 - builtins.NodeIDBits -func setCellInfoAnnot(a *tree.Annotations, sourceID int32, rowIndex int64) { - cellAnnot := a.Get(cellInfoAddr).(cellInfoAnnot) - cellAnnot.uniqueRowIDInst = 0 - cellAnnot.sourceID = sourceID - cellAnnot.rowID = rowIndex - a.Set(cellInfoAddr, cellAnnot) -} - // Row inserts kv operations into the current kv batch, and triggers a SendBatch // if necessary. func (c *DatumRowConverter) Row(ctx context.Context, sourceID int32, rowIndex int64) error { @@ -588,7 +600,7 @@ func (c *DatumRowConverter) Row(ctx context.Context, sourceID int32, rowIndex in _, ok := c.IsTargetCol[i] return ok } - setCellInfoAnnot(c.EvalCtx.Annotations, sourceID, rowIndex) + getCellInfoAnnotation(c.EvalCtx.Annotations).Reset(sourceID, rowIndex) for i := range c.cols { col := &c.cols[i] if !isTargetCol(i) && col.DefaultExpr != nil {