Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
51390: importccl: support DEFAULT columns for current timestamp functions (like now()) for IMPORT INTO r=Anzoteh96 a=Anzoteh96

This PR follows up from cockroachdb#50295 to add support for functions concerning current time as default expression. The functions supported are: 
`current_date()`, `current_timestamp()`, `localtimestamp()`, `now()`, `statement_timestamp()`, `timeofday()`, `transaction_timestamp()`. 
These functions have values that stay the same throughout different rows of an IMPORT: they record the transaction timestamp. Notice that `clock_timestamp()`is not supported as it records the time of a row being inserted, and therefore varies through different rows of an IMPORT. 

This is achieved by injecting the walltime recorded at `importCtx` into evalCtx required for the evaluation of these functions.

In addition, this PR also uses a visitor method to determine whether a default expression is supported by IMPORT INTO. Specifically, it checks that all function expressions (and arguments) of the default expression, when represented as a tree, contain either immutable expressions or functions we support.
Partially addresses cockroachdb#48253.

Release note (general change): current timestamp functions are now supported by IMPORT INTO.

Co-authored-by: anzoteh96 <[email protected]>
  • Loading branch information
craig[bot] and anzoteh96 committed Jul 21, 2020
2 parents 8354896 + 44182ba commit c78c517
Show file tree
Hide file tree
Showing 3 changed files with 287 additions and 32 deletions.
11 changes: 11 additions & 0 deletions pkg/ccl/importccl/import_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
)
Expand All @@ -51,6 +52,14 @@ func (cp *readImportDataProcessor) OutputTypes() []*types.T {
return csvOutputTypes
}

func injectTimeIntoEvalCtx(ctx *tree.EvalContext, walltime int64) {
sec := walltime / int64(time.Second)
nsec := walltime % int64(time.Second)
unixtime := timeutil.Unix(sec, nsec)
ctx.StmtTimestamp = unixtime
ctx.TxnTimestamp = unixtime
}

func newReadImportDataProcessor(
flowCtx *execinfra.FlowCtx,
processorID int32,
Expand Down Expand Up @@ -127,6 +136,8 @@ func makeInputConverter(
return nil
}

injectTimeIntoEvalCtx(evalCtx, spec.WalltimeNanos)

if evalCtx.Txn != nil {
// If we have a transaction, then use it.
if err := installTypeMetadata(evalCtx); err != nil {
Expand Down
163 changes: 149 additions & 14 deletions pkg/ccl/importccl/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/jackc/pgx"
"github.com/linkedin/goavro/v2"
Expand Down Expand Up @@ -2621,22 +2622,156 @@ func TestImportIntoCSV(t *testing.T) {
{"35", "102"},
{"67", "102"}})
})
t.Run("sequence-impure", func(t *testing.T) {
t.Run("unsupported-functions", func(t *testing.T) {
data = "1\n2"
sqlDB.Exec(t, `CREATE SEQUENCE testseq`)
defer sqlDB.Exec(t, `DROP TABLE t`)
sqlDB.Exec(t, `CREATE TABLE t(a INT, b INT DEFAULT nextval('testseq'))`)
sqlDB.ExpectErr(t,
fmt.Sprintf(`non-constant default expression .* for non-targeted column "b" is not supported by IMPORT INTO`),
fmt.Sprintf(`IMPORT INTO t (a) CSV DATA ("%s")`, srv.URL))
testCases := []struct {
name string
defaultExpr string
colType string
seqName string
}{
{
name: "nextval",
defaultExpr: "nextval('testseq')",
colType: "INT",
seqName: "testseq",
},
{
name: "random",
defaultExpr: "random()",
colType: "FLOAT",
},
{
name: "random_plus_timestamp",
defaultExpr: "(100*random())::int + current_timestamp()::int",
colType: "INT",
},
{
name: "deep_nesting",
defaultExpr: "(1 + 2 + (100 * round(3 + random())::int)) * 5 + 3",
colType: "INT",
},
}
for _, test := range testCases {
if test.seqName != "" {
defer sqlDB.Exec(t, fmt.Sprintf(`DROP SEQUENCE IF EXISTS %s`, test.seqName))
}
t.Run(test.name, func(t *testing.T) {
defer sqlDB.Exec(t, `DROP TABLE t`)
if test.seqName != "" {
sqlDB.Exec(t, fmt.Sprintf(`CREATE SEQUENCE %s`, test.seqName))
}
sqlDB.Exec(t, fmt.Sprintf(`CREATE TABLE t(a INT, b %s DEFAULT %s)`, test.colType, test.defaultExpr))
sqlDB.ExpectErr(t,
fmt.Sprintf(`unsafe for import`),
fmt.Sprintf(`IMPORT INTO t (a) CSV DATA ("%s")`, srv.URL))
})
}
})
t.Run("now-impure", func(t *testing.T) {
data = "1\n2"
sqlDB.Exec(t, `CREATE TABLE t(a INT, b TIMESTAMP DEFAULT now())`)
defer sqlDB.Exec(t, `DROP TABLE t`)
sqlDB.ExpectErr(t,
fmt.Sprintf(`non-constant default expression .* for non-targeted column "b" is not supported by IMPORT INTO`),
fmt.Sprintf(`IMPORT INTO t (a) CSV DATA ("%s")`, srv.URL))
t.Run("current-timestamp", func(t *testing.T) {
data = "1\n2\n3\n4\n5\n6"
testCases := []struct {
name string
defaultExpr string
colType string
truncate time.Duration
}{
{
name: "current_date",
defaultExpr: "current_date()",
colType: "DATE",
truncate: 24 * time.Hour,
},
{
name: "current_timestamp",
defaultExpr: "current_timestamp()",
colType: "TIMESTAMP",
},
{
name: "current_timestamp_with_precision",
defaultExpr: "current_timestamp(3)",
colType: "TIMESTAMP",
truncate: time.Millisecond,
},
{
name: "current_timestamp_as_int",
defaultExpr: "current_timestamp()::int",
colType: "INT",
},
{
name: "localtimestamp",
defaultExpr: "localtimestamp()::TIMESTAMPTZ",
colType: "TIMESTAMPTZ",
},
{
name: "localtimestamp_with_precision",
defaultExpr: "localtimestamp(3)",
colType: "TIMESTAMP",
truncate: time.Millisecond,
},
{
name: "localtimestamp_with_expr_precision",
defaultExpr: "localtimestamp(1+2+3)",
colType: "TIMESTAMP",
},
{
name: "now",
defaultExpr: "now()",
colType: "TIMESTAMP",
},
{
name: "now-case-insensitive",
defaultExpr: "NoW()",
colType: "DATE",
},
{
name: "pg_catalog.now",
defaultExpr: "pg_catalog.now()",
colType: "DATE",
},
{
name: "statement_timestamp",
defaultExpr: "statement_timestamp()",
colType: "TIMESTAMP",
},
{
name: "transaction_timestamp",
defaultExpr: "transaction_timestamp()",
colType: "TIMESTAMP",
},
}

for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
defer sqlDB.Exec(t, `DROP TABLE t`)
sqlDB.Exec(t, fmt.Sprintf(`CREATE TABLE t(a INT, b %s DEFAULT %s)`, test.colType, test.defaultExpr))
minTs := timeutil.Now()
sqlDB.Exec(t, fmt.Sprintf(`IMPORT INTO t (a) CSV DATA ("%s")`, srv.URL))
maxTs := timeutil.Now()
if test.truncate != 0 {
minTs = minTs.Truncate(test.truncate)
maxTs = maxTs.Truncate(test.truncate)
}

var numBadRows int
if test.colType == "INT" {
minTsInt := minTs.Unix()
maxTsInt := maxTs.Unix()
sqlDB.QueryRow(t,
`SELECT count(*) FROM t WHERE b !=(SELECT b FROM t WHERE a=1) OR b IS NULL or b < $1 or b > $2`,
minTsInt,
maxTsInt,
).Scan(&numBadRows)
} else {
sqlDB.QueryRow(t,
`SELECT count(*) FROM t WHERE b !=(SELECT b FROM t WHERE a=1) OR b IS NULL or b < $1 or b > $2`,
minTs,
maxTs,
).Scan(&numBadRows)
}
require.Equal(t, 0, numBadRows)
})
}
})
t.Run("pgdump", func(t *testing.T) {
data = "INSERT INTO t VALUES (1, 2), (3, 4)"
Expand Down
Loading

0 comments on commit c78c517

Please sign in to comment.