From 90b5a2006a04deebc17257fb018a27f4298dd100 Mon Sep 17 00:00:00 2001 From: anzoteh96 Date: Mon, 6 Jul 2020 14:28:31 -0400 Subject: [PATCH] importccl: support DEFAULT columns for current timestamp functions like now(), localtimestamp, transaction_timestamp, current_date This PR follows up from #50295 to add support for functions concerning current timestamps, which include now(), localtimestamp(), transaction_timestamp(), current_date(). This is achieved by injecting the walltime recorded at `importCtx` into evalCtx required for the evaluation of these functions. Partially addresses #48253. Release note (general change): timestamp functions are now supported by IMPORT INTO. --- pkg/ccl/importccl/import_stmt_test.go | 98 +++++++++++++++++++++++++-- pkg/ccl/importccl/read_import_base.go | 5 ++ pkg/sql/row/row_converter.go | 21 +++++- 3 files changed, 116 insertions(+), 8 deletions(-) diff --git a/pkg/ccl/importccl/import_stmt_test.go b/pkg/ccl/importccl/import_stmt_test.go index c5d9110328be..815c48670d14 100644 --- a/pkg/ccl/importccl/import_stmt_test.go +++ b/pkg/ccl/importccl/import_stmt_test.go @@ -57,6 +57,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" "github.com/jackc/pgx" "github.com/linkedin/goavro/v2" @@ -64,6 +65,28 @@ import ( "github.com/stretchr/testify/require" ) +// Checks that the timestamp is the same for all the rows, and that +// it lies between the startTime and the endTime. +func validTimestamp( + rows [][]string, colIndices int, startTime time.Time, endTime time.Time, timeNow time.Time, +) bool { + if len(rows) == 0 { + return true + } + targetTimeStr := "" + for _, row := range rows { + if targetTimeStr == "" { + targetTimeStr = row[colIndices] + } else { + if targetTimeStr != row[colIndices] { + return false + } + } + } + + return !timeNow.Before(startTime) && !timeNow.After(endTime) +} + func TestImportData(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -2613,13 +2636,74 @@ func TestImportIntoCSV(t *testing.T) { fmt.Sprintf(`non-constant default expression .* for non-targeted column "b" is not supported by IMPORT INTO`), fmt.Sprintf(`IMPORT INTO t (a) CSV DATA ("%s")`, srv.URL)) }) - t.Run("now-impure", func(t *testing.T) { - data = "1\n2" - sqlDB.Exec(t, `CREATE TABLE t(a INT, b TIMESTAMP DEFAULT now())`) - defer sqlDB.Exec(t, `DROP TABLE t`) - sqlDB.ExpectErr(t, - fmt.Sprintf(`non-constant default expression .* for non-targeted column "b" is not supported by IMPORT INTO`), - fmt.Sprintf(`IMPORT INTO t (a) CSV DATA ("%s")`, srv.URL)) + t.Run("current-timestamp", func(t *testing.T) { + // TODO(anzoteh96): need to figure out a way for timeofday() validation + // because the format returned is different from the usually UTC format + // and it's stored as string in the database. + data = "1\n2\n3\n4\n5\n6" + testCases := []struct { + defaultExpr string + colType string + millisecond bool + }{ + { + defaultExpr: "current_date()", + colType: "DATE", + }, + { + defaultExpr: "current_timestamp()", + colType: "TIMESTAMP", + }, + { + defaultExpr: "current_timestamp", + colType: "TIMESTAMP", + millisecond: true, + }, + { + defaultExpr: "localtimestamp()", + colType: "TIMESTAMP", + }, + { + defaultExpr: "localtimestamp", + colType: "TIMESTAMP", + millisecond: true, + }, + { + defaultExpr: "now()", + colType: "TIMESTAMP", + }, + { + defaultExpr: "statement_timestamp()", + colType: "TIMESTAMP", + }, + { + defaultExpr: "transaction_timestamp()", + colType: "TIMESTAMP", + }, + } + + for _, test := range testCases { + if test.millisecond { + sqlDB.Exec(t, fmt.Sprintf(`CREATE TABLE t(a INT, b %s DEFAULT %s(%d))`, test.colType, test.defaultExpr, 3)) + } else { + sqlDB.Exec(t, fmt.Sprintf(`CREATE TABLE t(a INT, b %s DEFAULT %s)`, test.colType, test.defaultExpr)) + } + timeStart := timeutil.Now() + sqlDB.Exec(t, fmt.Sprintf(`IMPORT INTO t (a) CSV DATA ("%s")`, srv.URL)) + timeEnd := timeutil.Now() + if test.colType == "DATE" { + timeStart = timeStart.Truncate(24 * time.Hour) + timeEnd = timeEnd.Truncate(24 * time.Hour) + } else if test.millisecond { + timeStart = timeStart.Truncate((time.Millisecond)) + timeEnd = timeEnd.Truncate((time.Millisecond)) + } + var timeNow time.Time + sqlDB.QueryRow(t, `SELECT b from t`).Scan(&timeNow) + timeStr := sqlDB.QueryStr(t, `SELECT * from t`) + assert.True(t, validTimestamp(timeStr, 1, timeStart, timeEnd, timeNow), test.defaultExpr, timeStart, timeStr, timeEnd) + sqlDB.Exec(t, `DROP TABLE t`) + } }) t.Run("pgdump", func(t *testing.T) { data = "INSERT INTO t VALUES (1, 2), (3, 4)" diff --git a/pkg/ccl/importccl/read_import_base.go b/pkg/ccl/importccl/read_import_base.go index 1d2340b547e0..c089157ac3b1 100644 --- a/pkg/ccl/importccl/read_import_base.go +++ b/pkg/ccl/importccl/read_import_base.go @@ -35,6 +35,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/cloudimpl" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" ) @@ -485,6 +486,10 @@ func runParallelImport( producer importRowProducer, consumer importRowConsumer, ) error { + sec := importCtx.walltime / int64(time.Second) + nsec := importCtx.walltime % int64(time.Second) + importCtx.evalCtx.TxnTimestamp = timeutil.Unix(sec, nsec) + importCtx.evalCtx.StmtTimestamp = timeutil.Unix(sec, nsec) batchSize := importCtx.batchSize if batchSize <= 0 { batchSize = parallelImporterReaderBatchSize diff --git a/pkg/sql/row/row_converter.go b/pkg/sql/row/row_converter.go index 8e7dbda012c9..eff010f6ddfb 100644 --- a/pkg/sql/row/row_converter.go +++ b/pkg/sql/row/row_converter.go @@ -12,6 +12,7 @@ package row import ( "context" + "strings" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/sem/builtins" @@ -23,6 +24,24 @@ import ( "github.com/cockroachdb/errors" ) +var supportedDefault = []string{ + "current_date()", "current_timestamp", + "localtimestamp", "now()", "statement_timestamp()", + "timeofday()", "transaction_timestamp()", +} + +func isSupportedDefault(ctx *tree.EvalContext, expr tree.TypedExpr, defaultStr string) bool { + if tree.IsConst(ctx, expr) { + return true + } + for _, supportedStr := range supportedDefault { + if strings.Contains(defaultStr, supportedStr) { + return true + } + } + return false +} + // KVInserter implements the putter interface. type KVInserter func(roachpb.KeyValue) @@ -374,7 +393,7 @@ func (c *DatumRowConverter) Row(ctx context.Context, sourceID int32, rowIndex in for i := range c.cols { col := &c.cols[i] if _, ok := c.IsTargetCol[i]; !ok && !col.Hidden && col.DefaultExpr != nil { - if !tree.IsConst(c.EvalCtx, c.defaultExprs[i]) { + if !isSupportedDefault(c.EvalCtx, c.defaultExprs[i], col.DefaultExprStr()) { // Check if the default expression is a constant expression as we do not // support non-constant default expressions for non-target columns in IMPORT INTO. //