Skip to content

Commit

Permalink
changes as per review
Browse files Browse the repository at this point in the history
  • Loading branch information
anzoteh96 committed Jul 27, 2020
1 parent df25b0c commit 2af7e56
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 54 deletions.
11 changes: 7 additions & 4 deletions pkg/ccl/importccl/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3263,6 +3263,11 @@ func TestImportDefault(t *testing.T) {
rowIDCols: []string{selectNotNull("a")},
},
{
// unique_rowid()+unique_rowid() won't work as the rowid produced by import
// has its leftmost bit set to 1, and adding them causes overflow. A way to
// get around is to have each unique_rowid() modulo a number, M. Here M = 1e9+7
// is used 1e9+7 here given that it's big enough and is a prime, which is
// generally effective in avoiding collisions.
name: "rowid+rowid",
create: fmt.Sprintf(
`a INT DEFAULT (unique_rowid() %% %d) + (unique_rowid() %% %d), b INT PRIMARY KEY, c STRING`, M, M),
Expand Down Expand Up @@ -4430,14 +4435,12 @@ func TestImportPgDumpGeo(t *testing.T) {

// Verify both created tables are identical.
importCreate := sqlDB.QueryStr(t, "SELECT create_statement FROM [SHOW CREATE importdb.nyc_census_blocks]")
// Families are slightly different due to that rowid shows up in exec
// but not import (possibly due to the ALTER TABLE statement that makes
// Families are slightly different due to rowid showing up in exec but
// not import (possibly due to the ALTER TABLE statement that makes
// gid a primary key), so add that into import to match exec.
importCreate[0][0] = strings.Replace(importCreate[0][0], "boroname, geom", "boroname, rowid, geom", 1)
sqlDB.CheckQueryResults(t, "SELECT create_statement FROM [SHOW CREATE execdb.nyc_census_blocks]", importCreate)

// Drop the comparison of gid for import vs exec, then check that gid
// in import is indeed valid rowid.
importCols := "blkid, popn_total, popn_white, popn_black, popn_nativ, popn_asian, popn_other, boroname"
importSelect := sqlDB.QueryStr(t, fmt.Sprintf(
"SELECT (%s) FROM importdb.nyc_census_blocks ORDER BY PRIMARY KEY importdb.nyc_census_blocks",
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/importccl/read_import_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ func makeDatumConverter(
ctx context.Context, importCtx *parallelImportContext, fileCtx *importFileContext,
) (*row.DatumRowConverter, error) {
conv, err := row.NewDatumRowConverter(
ctx, importCtx.tableDesc, importCtx.targetCols, importCtx.evalCtx.Copy(), importCtx.kvCh)
ctx, importCtx.tableDesc, importCtx.targetCols, importCtx.evalCtx, importCtx.kvCh)
if err == nil {
conv.KvBatch.Source = fileCtx.source
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/importccl/read_import_pgdump.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ func newPgDumpReader(
for i, col := range table.Desc.VisibleColumns() {
colSubMap[col.Name] = i
}
conv, err := row.NewDatumRowConverter(ctx, table.Desc, targetCols, evalCtx.Copy(), kvCh)
conv, err := row.NewDatumRowConverter(ctx, table.Desc, targetCols, evalCtx, kvCh)
if err != nil {
return nil, err
}
Expand Down
108 changes: 60 additions & 48 deletions pkg/sql/row/row_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,21 @@ const (
cellInfoAddr tree.AnnotationIdx = iota + 1
)

type cellInfoAnnot struct {
sourceID int32
rowID int64
uniqueRowIDInst int
uniqueRowIDTotal int
type cellInfoAnnotation struct {
sourceID int32
rowID int64
uniqueRowIDInstance int
uniqueRowIDTotal int
}

func getCellInfoAnnotation(t *tree.Annotations) *cellInfoAnnotation {
return t.Get(cellInfoAddr).(*cellInfoAnnotation)
}

func (c *cellInfoAnnotation) Reset(sourceID int32, rowID int64) {
c.sourceID = sourceID
c.rowID = rowID
c.uniqueRowIDInstance = 0
}

// We don't want to call unique_rowid() for columns with such default expressions
Expand Down Expand Up @@ -82,26 +92,28 @@ type cellInfoAnnot struct {
// default expression unique_rowid() could cause collisions when IMPORTs are run
// too close to each other. It will therefore be nice to fix this problem.
func importUniqueRowID(evalCtx *tree.EvalContext, args tree.Datums) (tree.Datum, error) {
c := (*evalCtx.Annotations).Get(cellInfoAddr).(cellInfoAnnot)
c := getCellInfoAnnotation(evalCtx.Annotations)
avoidCollisionsWithSQLsIDs := uint64(1 << 63)
shiftedIndex := int64(c.uniqueRowIDTotal)*c.rowID + int64(c.uniqueRowIDInst)
shiftedIndex := int64(c.uniqueRowIDTotal)*c.rowID + int64(c.uniqueRowIDInstance)
returnIndex := (uint64(c.sourceID) << rowIDBits) ^ uint64(shiftedIndex)
c.uniqueRowIDInst++
(*evalCtx.Annotations).Set(cellInfoAddr, c)
c.uniqueRowIDInstance++
evalCtx.Annotations.Set(cellInfoAddr, c)
return tree.NewDInt(tree.DInt(avoidCollisionsWithSQLsIDs | returnIndex)), nil
}

// Besides overriding, there are also counters that we want to keep track
// of as we walk through the expressions in a row (at datumRowConverter creation
// time). This will be handled by the sideEffect field: it will be called with an
// annotation passed in that changes the counter. In the case of unique_rowid, for
// example, we want to keep track of the total number of unique_rowid occurences
// example, we want to keep track of the total number of unique_rowid occurrences
// in a row.
type customFunc struct {
sideEffect func(annotations *tree.Annotations)
override *tree.FunctionDefinition
visitorSideEffect func(annotations *tree.Annotations)
override *tree.FunctionDefinition
}

var useDefaultBuiltin *customFunc

// Given that imports can be retried and resumed, we want to
// ensure that the default functions return the same value given
// the same arguments, even on retries. Therfore we decide to support
Expand All @@ -110,18 +122,16 @@ type customFunc struct {
var supportedImportFuncOverrides = map[string]*customFunc{
// These methods can be supported given that we set the statement
// and transaction timestamp to be equal, i.e. the write timestamp.
"current_date": nil,
"current_timestamp": nil,
"localtimestamp": nil,
"now": nil,
"statement_timestamp": nil,
"timeofday": nil,
"transaction_timestamp": nil,
"current_date": useDefaultBuiltin,
"current_timestamp": useDefaultBuiltin,
"localtimestamp": useDefaultBuiltin,
"now": useDefaultBuiltin,
"statement_timestamp": useDefaultBuiltin,
"timeofday": useDefaultBuiltin,
"transaction_timestamp": useDefaultBuiltin,
"unique_rowid": {
sideEffect: func(annot *tree.Annotations) {
c := annot.Get(cellInfoAddr).(cellInfoAnnot)
c.uniqueRowIDTotal++
annot.Set(cellInfoAddr, c)
visitorSideEffect: func(annot *tree.Annotations) {
getCellInfoAnnotation(annot).uniqueRowIDTotal++
},
override: makeBuiltinOverride(
tree.FunDefs["unique_rowid"],
Expand Down Expand Up @@ -158,10 +168,12 @@ func (e *unsafeErrExpr) Eval(_ *tree.EvalContext) (tree.Datum, error) {
// visitor walks the tree and ensures that any expression in the tree
// that's not immutable is what we explicitly support.
type importDefaultExprVisitor struct {
err error
ctx context.Context
annot *tree.Annotations
semaCtx *tree.SemaContext
err error
ctx context.Context
annotations *tree.Annotations
semaCtx *tree.SemaContext
// The volatile flag will be set if there's at least one volatile
// function appearing in the default expression.
volatile bool
}

Expand All @@ -170,6 +182,9 @@ func (v *importDefaultExprVisitor) VisitPre(expr tree.Expr) (recurse bool, newEx
return v.err == nil, expr
}

const overrideImmutable = false
const overrideVolatile = true

// VisitPost implements tree.Visitor interface.
func (v *importDefaultExprVisitor) VisitPost(expr tree.Expr) (newExpr tree.Expr) {
if v.err != nil {
Expand All @@ -182,16 +197,19 @@ func (v *importDefaultExprVisitor) VisitPost(expr tree.Expr) (newExpr tree.Expr)
if custom, isSafe := supportedImportFuncOverrides[resolvedFnName]; !isSafe {
v.err = errors.Newf(`function %s unsupported by IMPORT INTO`, resolvedFnName)
} else {
// No override exists, means it's okay to use the definitions given in builtin.go.
if custom == nil {
if custom == useDefaultBuiltin {
// No override exists, means it's okay to use the definitions given in
// builtin.go.
return expr
}
// Override exists, so we turn the volatile flag of the visitor to true.
// In addition, the sideEffect function needs to be called to update any
// relevant counter (e.g. the total number of occurrences of the unique_rowid
// function in an expression).
v.volatile = true
custom.sideEffect(v.annot)
// relevant counter (e.g. the total number of occurrences of the
// unique_rowid function in an expression).
v.volatile = overrideVolatile
if custom.visitorSideEffect != nil {
custom.visitorSideEffect(v.annotations)
}
funcExpr := &tree.FuncExpr{
Func: tree.ResolvableFunctionReference{FunctionReference: custom.override},
Type: fn.Type,
Expand Down Expand Up @@ -220,7 +238,7 @@ func SanitizeExprsForImport(
typedExpr, err := sqlbase.SanitizeVarFreeExpr(
ctx, expr, targetType, "import_default", &semaCtx, tree.VolatilityImmutable)
if err == nil {
return typedExpr, false, nil
return typedExpr, overrideImmutable, nil
}
// Now that the expressions are not immutable, we first check that they
// are of the correct type before checking for any unsupported functions
Expand All @@ -229,7 +247,7 @@ func SanitizeExprsForImport(
if err != nil {
return nil, false, unsafeExpressionError(err, "type checking error", expr.String())
}
v := &importDefaultExprVisitor{annot: evalCtx.Annotations}
v := &importDefaultExprVisitor{annotations: evalCtx.Annotations}
newExpr, _ := tree.WalkExpr(v, typedExpr)
if v.err != nil {
return nil, false, unsafeExpressionError(v.err, "expr walking error", expr.String())
Expand Down Expand Up @@ -448,7 +466,7 @@ func NewDatumRowConverter(
c := &DatumRowConverter{
tableDesc: immutDesc,
KvCh: kvCh,
EvalCtx: evalCtx,
EvalCtx: evalCtx.Copy(),
}

var targetColDescriptors []sqlbase.ColumnDescriptor
Expand Down Expand Up @@ -519,7 +537,7 @@ func NewDatumRowConverter(
}
hidden := -1
annot := make(tree.Annotations, 1)
annot.Set(cellInfoAddr, cellInfoAnnot{uniqueRowIDInst: 0})
annot.Set(cellInfoAddr, &cellInfoAnnotation{uniqueRowIDInstance: 0})
c.EvalCtx.Annotations = &annot
for i := range cols {
col := &cols[i]
Expand All @@ -532,7 +550,7 @@ func NewDatumRowConverter(
if col.DefaultExpr != nil {
// Placeholder for columns with default values that will be evaluated when
// each import row is being created.
typedExpr, volatile, err := SanitizeExprsForImport(ctx, evalCtx, defaultExprs[i], col.Type)
typedExpr, volatile, err := SanitizeExprsForImport(ctx, c.EvalCtx, defaultExprs[i], col.Type)
if err != nil {
// This expression may not be safe for import but we don't want to
// call the user out at this stage: targeted columns may not have
Expand All @@ -544,8 +562,10 @@ func NewDatumRowConverter(
}
} else {
c.defaultCache[i] = typedExpr
// This default expression isn't volatile, so we can evaluate once here
// and memoize it.
if !volatile {
c.defaultCache[i], err = c.defaultCache[i].Eval(evalCtx)
c.defaultCache[i], err = c.defaultCache[i].Eval(c.EvalCtx)
if err != nil {
return nil, errors.Wrapf(err, "error evaluating default expression")
}
Expand Down Expand Up @@ -573,22 +593,14 @@ func NewDatumRowConverter(

const rowIDBits = 64 - builtins.NodeIDBits

func setCellInfoAnnot(a *tree.Annotations, sourceID int32, rowIndex int64) {
cellAnnot := a.Get(cellInfoAddr).(cellInfoAnnot)
cellAnnot.uniqueRowIDInst = 0
cellAnnot.sourceID = sourceID
cellAnnot.rowID = rowIndex
a.Set(cellInfoAddr, cellAnnot)
}

// Row inserts kv operations into the current kv batch, and triggers a SendBatch
// if necessary.
func (c *DatumRowConverter) Row(ctx context.Context, sourceID int32, rowIndex int64) error {
isTargetCol := func(i int) bool {
_, ok := c.IsTargetCol[i]
return ok
}
setCellInfoAnnot(c.EvalCtx.Annotations, sourceID, rowIndex)
getCellInfoAnnotation(c.EvalCtx.Annotations).Reset(sourceID, rowIndex)
for i := range c.cols {
col := &c.cols[i]
if !isTargetCol(i) && col.DefaultExpr != nil {
Expand Down

0 comments on commit 2af7e56

Please sign in to comment.