Skip to content

Commit

Permalink
importccl: support DEFAULT columns for current timestamp functions
Browse files Browse the repository at this point in the history
like now(), localtimestamp, transaction_timestamp, current_date

This PR follows up from cockroachdb#50295 to add support for functions concerning
current timestamps, which include now(), localtimestamp(),
transaction_timestamp(), current_date(). This is achieved by
injecting the walltime recorded at `importCtx` into evalCtx required
for the evaluation of these functions.

Partially addresses cockroachdb#48253.

Release note (general change): timestamp functions are now supported
by IMPORT INTO.
  • Loading branch information
anzoteh96 committed Jul 14, 2020
1 parent 10f0c57 commit 90b5a20
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 8 deletions.
98 changes: 91 additions & 7 deletions pkg/ccl/importccl/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,36 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/jackc/pgx"
"github.com/linkedin/goavro/v2"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

// Checks that the timestamp is the same for all the rows, and that
// it lies between the startTime and the endTime.
func validTimestamp(
rows [][]string, colIndices int, startTime time.Time, endTime time.Time, timeNow time.Time,
) bool {
if len(rows) == 0 {
return true
}
targetTimeStr := ""
for _, row := range rows {
if targetTimeStr == "" {
targetTimeStr = row[colIndices]
} else {
if targetTimeStr != row[colIndices] {
return false
}
}
}

return !timeNow.Before(startTime) && !timeNow.After(endTime)
}

func TestImportData(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down Expand Up @@ -2613,13 +2636,74 @@ func TestImportIntoCSV(t *testing.T) {
fmt.Sprintf(`non-constant default expression .* for non-targeted column "b" is not supported by IMPORT INTO`),
fmt.Sprintf(`IMPORT INTO t (a) CSV DATA ("%s")`, srv.URL))
})
t.Run("now-impure", func(t *testing.T) {
data = "1\n2"
sqlDB.Exec(t, `CREATE TABLE t(a INT, b TIMESTAMP DEFAULT now())`)
defer sqlDB.Exec(t, `DROP TABLE t`)
sqlDB.ExpectErr(t,
fmt.Sprintf(`non-constant default expression .* for non-targeted column "b" is not supported by IMPORT INTO`),
fmt.Sprintf(`IMPORT INTO t (a) CSV DATA ("%s")`, srv.URL))
t.Run("current-timestamp", func(t *testing.T) {
// TODO(anzoteh96): need to figure out a way for timeofday() validation
// because the format returned is different from the usually UTC format
// and it's stored as string in the database.
data = "1\n2\n3\n4\n5\n6"
testCases := []struct {
defaultExpr string
colType string
millisecond bool
}{
{
defaultExpr: "current_date()",
colType: "DATE",
},
{
defaultExpr: "current_timestamp()",
colType: "TIMESTAMP",
},
{
defaultExpr: "current_timestamp",
colType: "TIMESTAMP",
millisecond: true,
},
{
defaultExpr: "localtimestamp()",
colType: "TIMESTAMP",
},
{
defaultExpr: "localtimestamp",
colType: "TIMESTAMP",
millisecond: true,
},
{
defaultExpr: "now()",
colType: "TIMESTAMP",
},
{
defaultExpr: "statement_timestamp()",
colType: "TIMESTAMP",
},
{
defaultExpr: "transaction_timestamp()",
colType: "TIMESTAMP",
},
}

for _, test := range testCases {
if test.millisecond {
sqlDB.Exec(t, fmt.Sprintf(`CREATE TABLE t(a INT, b %s DEFAULT %s(%d))`, test.colType, test.defaultExpr, 3))
} else {
sqlDB.Exec(t, fmt.Sprintf(`CREATE TABLE t(a INT, b %s DEFAULT %s)`, test.colType, test.defaultExpr))
}
timeStart := timeutil.Now()
sqlDB.Exec(t, fmt.Sprintf(`IMPORT INTO t (a) CSV DATA ("%s")`, srv.URL))
timeEnd := timeutil.Now()
if test.colType == "DATE" {
timeStart = timeStart.Truncate(24 * time.Hour)
timeEnd = timeEnd.Truncate(24 * time.Hour)
} else if test.millisecond {
timeStart = timeStart.Truncate((time.Millisecond))
timeEnd = timeEnd.Truncate((time.Millisecond))
}
var timeNow time.Time
sqlDB.QueryRow(t, `SELECT b from t`).Scan(&timeNow)
timeStr := sqlDB.QueryStr(t, `SELECT * from t`)
assert.True(t, validTimestamp(timeStr, 1, timeStart, timeEnd, timeNow), test.defaultExpr, timeStart, timeStr, timeEnd)
sqlDB.Exec(t, `DROP TABLE t`)
}
})
t.Run("pgdump", func(t *testing.T) {
data = "INSERT INTO t VALUES (1, 2), (3, 4)"
Expand Down
5 changes: 5 additions & 0 deletions pkg/ccl/importccl/read_import_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/cloudimpl"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
)
Expand Down Expand Up @@ -485,6 +486,10 @@ func runParallelImport(
producer importRowProducer,
consumer importRowConsumer,
) error {
sec := importCtx.walltime / int64(time.Second)
nsec := importCtx.walltime % int64(time.Second)
importCtx.evalCtx.TxnTimestamp = timeutil.Unix(sec, nsec)
importCtx.evalCtx.StmtTimestamp = timeutil.Unix(sec, nsec)
batchSize := importCtx.batchSize
if batchSize <= 0 {
batchSize = parallelImporterReaderBatchSize
Expand Down
21 changes: 20 additions & 1 deletion pkg/sql/row/row_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package row

import (
"context"
"strings"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/builtins"
Expand All @@ -23,6 +24,24 @@ import (
"github.com/cockroachdb/errors"
)

var supportedDefault = []string{
"current_date()", "current_timestamp",
"localtimestamp", "now()", "statement_timestamp()",
"timeofday()", "transaction_timestamp()",
}

func isSupportedDefault(ctx *tree.EvalContext, expr tree.TypedExpr, defaultStr string) bool {
if tree.IsConst(ctx, expr) {
return true
}
for _, supportedStr := range supportedDefault {
if strings.Contains(defaultStr, supportedStr) {
return true
}
}
return false
}

// KVInserter implements the putter interface.
type KVInserter func(roachpb.KeyValue)

Expand Down Expand Up @@ -374,7 +393,7 @@ func (c *DatumRowConverter) Row(ctx context.Context, sourceID int32, rowIndex in
for i := range c.cols {
col := &c.cols[i]
if _, ok := c.IsTargetCol[i]; !ok && !col.Hidden && col.DefaultExpr != nil {
if !tree.IsConst(c.EvalCtx, c.defaultExprs[i]) {
if !isSupportedDefault(c.EvalCtx, c.defaultExprs[i], col.DefaultExprStr()) {
// Check if the default expression is a constant expression as we do not
// support non-constant default expressions for non-target columns in IMPORT INTO.
//
Expand Down

0 comments on commit 90b5a20

Please sign in to comment.