Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
52359: sql: drop the "experimental" in experimental_follower_read_timestamp r=andreimatei a=andreimatei

The known badness with follower reads was fixed - most recently #49391, #44053
and #44878.

At this point, the "experimental" prefix doesn't serve any purpose, and
it unnecessarily raises questions.

The previous name stays as an alias for backwards compatibility.

Fixes #45243

Release note: The experimental_follower_read_timestamp() function was
renamed to follower_read_timestamp(), signifying more confidence in
CRDB's follower read implementation. The previous name remains a
supported alias.

52534: sql:  add notice for Alter TYPE ADD VALUE IF NOT EXIST command r=rohany a=himanshuchawla009

Fixes: #52327

Added a notice message similar to postgres if user tries to add a value in enum which already exists using IF NOT EXIST command.

> root@:26257/defaultdb> alter type color ADD VALUE IF NOT EXISTS 'black';
ALTER TYPE

> root@:26257/defaultdb> alter type color ADD VALUE IF NOT EXISTS 'black';
NOTICE: enum label "black" already exists, skipping 

However, I noticed similar behaviour in other command like ALTER TABLE command. Altering existing table with if not exist 
command doesn't shows any notice.

Release note (sql change): Show notice to user when user tries to add value which already exists in enum .



52547: export/csv: prepend a unique id to file name of csv exports r=shermanCRL a=shermanCRL

Addresses #50580

When exporting CSV files, a failed & repeated export will use the same
file names. This can result in an error if an overwrite is disallowed,
but more importantly, it can result in a directory containing files
from different export runs. This can represent inconsistent data if the
previous export was not deleted.

This commit prepends a unique export ID to the file name. The ID
is the queryID from the sql.Statement struct. This choice seems to
guarantee the most uniqueness. It's 128 bits, and encoded as hex.

Other ID options considered were random int, timestamp, and
transaction ID. Each of these has a small risk of collision.

Related tests were updated and are (should be) passing.

Release note (enterprise change): Exported CSV files are now prepended
with a long unique ID. This can help to mitigate situations where
multiple export runs are written to the same directory, resulting in
mixed data. This change does not prevent mixed data; rather, it makes
it possible to identify files from distinct runs, so that an operator
can clean up.

Co-authored-by: Andrei Matei <[email protected]>
Co-authored-by: himanshuchawla009 <[email protected]>
Co-authored-by: Matt Sherman <[email protected]>
  • Loading branch information
4 people committed Aug 10, 2020
4 parents 898bca7 + 7284376 + 98e7deb + 59dae5d commit 7e4e0bf
Show file tree
Hide file tree
Showing 15 changed files with 192 additions and 100 deletions.
18 changes: 10 additions & 8 deletions docs/generated/sql/functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -380,14 +380,7 @@ significant than <code>element</code> to zero (or one, for day and month)</p>
<p>Compatible elements: millennium, century, decade, year, quarter, month,
week, day, hour, minute, second, millisecond, microsecond.</p>
</span></td></tr>
<tr><td><a name="experimental_follower_read_timestamp"></a><code>experimental_follower_read_timestamp() &rarr; <a href="timestamp.html">timestamptz</a></code></td><td><span class="funcdesc"><p>Returns a timestamp which is very likely to be safe to perform
against a follower replica.</p>
<p>This function is intended to be used with an AS OF SYSTEM TIME clause to perform
historical reads against a time which is recent but sufficiently old for reads
to be performed against the closest replica as opposed to the currently
leaseholder for a given range.</p>
<p>Note that this function requires an enterprise license on a CCL distribution to
return without an error.</p>
<tr><td><a name="experimental_follower_read_timestamp"></a><code>experimental_follower_read_timestamp() &rarr; <a href="timestamp.html">timestamptz</a></code></td><td><span class="funcdesc"><p>Same as follower_read_timestamp. This name is deprecated.</p>
</span></td></tr>
<tr><td><a name="experimental_strftime"></a><code>experimental_strftime(input: <a href="date.html">date</a>, extract_format: <a href="string.html">string</a>) &rarr; <a href="string.html">string</a></code></td><td><span class="funcdesc"><p>From <code>input</code>, extracts and formats the time as identified in <code>extract_format</code> using standard <code>strftime</code> notation (though not all formatting is supported).</p>
</span></td></tr>
Expand Down Expand Up @@ -428,6 +421,15 @@ timezone, timezone_hour, timezone_minute</p>
Compatible elements: hour, minute, second, millisecond, microsecond.
This is deprecated in favor of <code>extract</code> which supports duration.</p>
</span></td></tr>
<tr><td><a name="follower_read_timestamp"></a><code>follower_read_timestamp() &rarr; <a href="timestamp.html">timestamptz</a></code></td><td><span class="funcdesc"><p>Returns a timestamp which is very likely to be safe to perform
against a follower replica.</p>
<p>This function is intended to be used with an AS OF SYSTEM TIME clause to perform
historical reads against a time which is recent but sufficiently old for reads
to be performed against the closest replica as opposed to the currently
leaseholder for a given range.</p>
<p>Note that this function requires an enterprise license on a CCL distribution to
return without an error.</p>
</span></td></tr>
<tr><td><a name="localtimestamp"></a><code>localtimestamp() &rarr; <a href="date.html">date</a></code></td><td><span class="funcdesc"><p>Returns the time of the current transaction.</p>
<p>The value is based on a timestamp picked when the transaction starts
and which stays constant throughout the transaction. This timestamp
Expand Down
107 changes: 71 additions & 36 deletions pkg/ccl/importccl/exportcsv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@
package importccl_test

import (
"bytes"
"compress/gzip"
"context"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"strings"
"testing"
Expand Down Expand Up @@ -186,6 +186,18 @@ func TestExportJoin(t *testing.T) {
sqlDB.Exec(t, `EXPORT INTO CSV 'nodelocal://0/join' FROM SELECT * FROM t, t as u`)
}

func readFileByGlob(t *testing.T, pattern string) []byte {
paths, err := filepath.Glob(pattern)
require.NoError(t, err)

require.Equal(t, 1, len(paths))

result, err := ioutil.ReadFile(paths[0])
require.NoError(t, err)

return result
}

func TestExportOrder(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand All @@ -196,30 +208,58 @@ func TestExportOrder(t *testing.T) {
defer srv.Stopper().Stop(context.Background())
sqlDB := sqlutils.MakeSQLRunner(db)

sqlDB.Exec(t, `create table foo (i int primary key, x int, y int, z int, index (y))`)
sqlDB.Exec(t, `insert into foo values (1, 12, 3, 14), (2, 22, 2, 24), (3, 32, 1, 34)`)
sqlDB.Exec(t, `CREATE TABLE foo (i INT PRIMARY KEY, x INT, y INT, z INT, INDEX (y))`)
sqlDB.Exec(t, `INSERT INTO foo VALUES (1, 12, 3, 14), (2, 22, 2, 24), (3, 32, 1, 34)`)

sqlDB.Exec(t, `EXPORT INTO CSV 'nodelocal://0/order' FROM SELECT * FROM foo ORDER BY y ASC LIMIT 2`)
content := readFileByGlob(t, filepath.Join(dir, "order", "export*-n1.0.csv"))

sqlDB.Exec(t, `EXPORT INTO CSV 'nodelocal://0/order' from select * from foo order by y asc limit 2`)
content, err := ioutil.ReadFile(filepath.Join(dir, "order", "n1.0.csv"))
if err != nil {
t.Fatal(err)
}
if expected, got := "3,32,1,34\n2,22,2,24\n", string(content); expected != got {
t.Fatalf("expected %q, got %q", expected, got)
}
}

func TestExportUniqueness(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
dir, cleanupDir := testutils.TempDir(t)
defer cleanupDir()

srv, db, _ := serverutils.StartServer(t, base.TestServerArgs{ExternalIODir: dir})
defer srv.Stopper().Stop(context.Background())
sqlDB := sqlutils.MakeSQLRunner(db)

sqlDB.Exec(t, `CREATE TABLE foo (i INT PRIMARY KEY, x INT, y INT, z INT, INDEX (y))`)
sqlDB.Exec(t, `INSERT INTO foo VALUES (1, 12, 3, 14), (2, 22, 2, 24), (3, 32, 1, 34)`)

const stmt = `EXPORT INTO CSV 'nodelocal://0/' WITH chunk_rows=$1 FROM SELECT * FROM foo`

sqlDB.Exec(t, stmt, 2)
dir1, err := ioutil.ReadDir(dir)
require.NoError(t, err)

sqlDB.Exec(t, stmt, 2)
dir2, err := ioutil.ReadDir(dir)
require.NoError(t, err)

require.Equal(t, 2*len(dir1), len(dir2), "second export did not double the number of files")
}

func TestExportUserDefinedTypes(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
baseDir, cleanup := testutils.TempDir(t)
dir, cleanup := testutils.TempDir(t)
defer cleanup()

tc := testcluster.StartTestCluster(
t, 1, base.TestClusterArgs{ServerArgs: base.TestServerArgs{ExternalIODir: baseDir}})
t, 1, base.TestClusterArgs{ServerArgs: base.TestServerArgs{ExternalIODir: dir}})
defer tc.Stopper().Stop(ctx)

conn := tc.Conns[0]
sqlDB := sqlutils.MakeSQLRunner(conn)

// Set up some initial state for the tests.
sqlDB.Exec(t, `
SET experimental_enable_enums = true;
Expand All @@ -232,23 +272,27 @@ INSERT INTO greeting_table VALUES ('hello', 'hello'), ('hi', 'hi');
expected string
}{
{
stmt: "EXPORT INTO CSV 'nodelocal://0/test/' FROM (SELECT 'hello':::greeting, 'hi':::greeting)",
stmt: "EXPORT INTO CSV 'nodelocal://0/%s/' FROM (SELECT 'hello':::greeting, 'hi':::greeting)",
expected: "hello,hi\n",
},
{
stmt: "EXPORT INTO CSV 'nodelocal://0/test/' FROM TABLE greeting_table",
stmt: "EXPORT INTO CSV 'nodelocal://0/%s/' FROM TABLE greeting_table",
expected: "hello,hello\nhi,hi\n",
},
{
stmt: "EXPORT INTO CSV 'nodelocal://0/test/' FROM (SELECT x, y, enum_first(x) FROM greeting_table)",
stmt: "EXPORT INTO CSV 'nodelocal://0/%s/' FROM (SELECT x, y, enum_first(x) FROM greeting_table)",
expected: "hello,hello,hello\nhi,hi,hello\n",
},
}
for _, test := range tests {
sqlDB.Exec(t, test.stmt)
for i, test := range tests {
path := fmt.Sprintf("test%d", i)
stmt := fmt.Sprintf(test.stmt, path)

sqlDB.Exec(t, stmt)

// Read the dumped file.
contents, err := ioutil.ReadFile(filepath.Join(baseDir, "test", "n1.0.csv"))
require.NoError(t, err)
contents := readFileByGlob(t, filepath.Join(dir, path, "export*-n1.0.csv"))

require.Equal(t, test.expected, string(contents))
}
}
Expand All @@ -269,28 +313,19 @@ func TestExportOrderCompressed(t *testing.T) {
defer srv.Stopper().Stop(context.Background())
sqlDB := sqlutils.MakeSQLRunner(db)

sqlDB.Exec(t, `create table foo (i int primary key, x int, y int, z int, index (y))`)
sqlDB.Exec(t, `insert into foo values (1, 12, 3, 14), (2, 22, 2, 24), (3, 32, 1, 34)`)
sqlDB.Exec(t, `CREATE TABLE foo (i INT PRIMARY KEY, x INT, y INT, z INT, INDEX (y))`)
sqlDB.Exec(t, `INSERT INTO foo VALUES (1, 12, 3, 14), (2, 22, 2, 24), (3, 32, 1, 34)`)

sqlDB.Exec(t, `EXPORT INTO CSV 'nodelocal://0/order' with compression = gzip from select * from foo order by y asc limit 2`)
fi, err := os.Open(filepath.Join(dir, "order", "n1.0.csv.gz"))
defer close(fi)

if err != nil {
t.Fatal(err)
}
compressed := readFileByGlob(t, filepath.Join(dir, "order", "export*-n1.0.csv.gz"))

gzipReader, err := gzip.NewReader(fi)
gzipReader, err := gzip.NewReader(bytes.NewReader(compressed))
defer close(gzipReader)

if err != nil {
t.Fatal(err)
}
require.NoError(t, err)

content, err := ioutil.ReadAll(gzipReader)
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)

if expected, got := "3,32,1,34\n2,22,2,24\n", string(content); expected != got {
t.Fatalf("expected %q, got %q", expected, got)
Expand All @@ -300,18 +335,18 @@ func TestExportOrderCompressed(t *testing.T) {
func TestExportShow(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

dir, cleanupDir := testutils.TempDir(t)
defer cleanupDir()

srv, db, _ := serverutils.StartServer(t, base.TestServerArgs{ExternalIODir: dir})
defer srv.Stopper().Stop(context.Background())

sqlDB := sqlutils.MakeSQLRunner(db)

sqlDB.Exec(t, `EXPORT INTO CSV 'nodelocal://0/show' FROM SELECT * FROM [SHOW DATABASES] ORDER BY database_name`)
content, err := ioutil.ReadFile(filepath.Join(dir, "show", "n1.0.csv"))
if err != nil {
t.Fatal(err)
}
content := readFileByGlob(t, filepath.Join(dir, "show", "export*-n1.0.csv"))

if expected, got := "defaultdb\npostgres\nsystem\n", string(content); expected != got {
t.Fatalf("expected %q, got %q", expected, got)
}
Expand Down
9 changes: 6 additions & 3 deletions pkg/ccl/importccl/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1886,12 +1886,15 @@ IMPORT TABLE import_with_db_privs (a INT8 PRIMARY KEY, b STRING) CSV DATA (%s)`,
func TestExportImportRoundTrip(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
baseDir, cleanup := testutils.TempDir(t)
defer cleanup()

tc := testcluster.StartTestCluster(
t, 1, base.TestClusterArgs{ServerArgs: base.TestServerArgs{ExternalIODir: baseDir}})
defer tc.Stopper().Stop(ctx)

conn := tc.Conns[0]
sqlDB := sqlutils.MakeSQLRunner(conn)

Expand All @@ -1905,19 +1908,19 @@ func TestExportImportRoundTrip(t *testing.T) {
// with a unique directory name per run.
{
stmts: `EXPORT INTO CSV 'nodelocal://0/%[1]s' FROM SELECT ARRAY['a', 'b', 'c'];
IMPORT TABLE t (x TEXT[]) CSV DATA ('nodelocal://0/%[1]s/n1.0.csv')`,
IMPORT TABLE t (x TEXT[]) CSV DATA ('nodelocal://0/%[1]s/export*-n1.0.csv')`,
tbl: "t",
expected: `SELECT ARRAY['a', 'b', 'c']`,
},
{
stmts: `EXPORT INTO CSV 'nodelocal://0/%[1]s' FROM SELECT ARRAY[b'abc', b'\141\142\143', b'\x61\x62\x63'];
IMPORT TABLE t (x BYTES[]) CSV DATA ('nodelocal://0/%[1]s/n1.0.csv')`,
IMPORT TABLE t (x BYTES[]) CSV DATA ('nodelocal://0/%[1]s/export*-n1.0.csv')`,
tbl: "t",
expected: `SELECT ARRAY[b'abc', b'\141\142\143', b'\x61\x62\x63']`,
},
{
stmts: `EXPORT INTO CSV 'nodelocal://0/%[1]s' FROM SELECT 'dog' COLLATE en;
IMPORT TABLE t (x STRING COLLATE en) CSV DATA ('nodelocal://0/%[1]s/n1.0.csv')`,
IMPORT TABLE t (x STRING COLLATE en) CSV DATA ('nodelocal://0/%[1]s/export*-n1.0.csv')`,
tbl: "t",
expected: `SELECT 'dog' COLLATE en`,
},
Expand Down
6 changes: 3 additions & 3 deletions pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,10 +201,10 @@ func TestFollowerReadsWithStaleDescriptor(t *testing.T) {
skip.UnderShort(t)

ctx := context.Background()
// The test uses experimental_follower_read_timestamp().
// The test uses follower_read_timestamp().
defer utilccl.TestingEnableEnterprise()()

historicalQuery := `SELECT * FROM test AS OF SYSTEM TIME experimental_follower_read_timestamp() WHERE k=2`
historicalQuery := `SELECT * FROM test AS OF SYSTEM TIME follower_read_timestamp() WHERE k=2`
recCh := make(chan tracing.Recording, 1)

var n2Addr, n3Addr syncutil.AtomicString
Expand Down Expand Up @@ -247,7 +247,7 @@ func TestFollowerReadsWithStaleDescriptor(t *testing.T) {
n1.Exec(t, `CREATE TABLE test (k INT PRIMARY KEY)`)
n1.Exec(t, `ALTER TABLE test EXPERIMENTAL_RELOCATE VALUES (ARRAY[1,2], 1)`)
// Speed up closing of timestamps, as we'll in order to be able to use
// experimental_follower_read_timestamp().
// follower_read_timestamp().
// Every 0.2s we'll close the timestamp from 0.4s ago. We'll attempt follower reads
// for anything below 0.4s * (1 + 0.5 * 20) = 4.4s.
n1.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '0.4s'`)
Expand Down
3 changes: 3 additions & 0 deletions pkg/ccl/logictestccl/testdata/logic_test/as_of
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,8 @@ CREATE TABLE t (i INT)
statement ok
INSERT INTO t VALUES (2)

statement error pq: relation "t" does not exist
SELECT * FROM t AS OF SYSTEM TIME follower_read_timestamp()

statement error pq: relation "t" does not exist
SELECT * FROM t AS OF SYSTEM TIME experimental_follower_read_timestamp()
2 changes: 1 addition & 1 deletion pkg/cmd/roachtest/follower_reads.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func runFollowerReadsTest(ctx context.Context, t *test, c *cluster) {
return func() error {
nodeDB := conns[node-1]
r := nodeDB.QueryRowContext(ctx, "SELECT v FROM test.test AS OF SYSTEM "+
"TIME experimental_follower_read_timestamp() WHERE k = $1", k)
"TIME follower_read_timestamp() WHERE k = $1", k)
var got int64
if err := r.Scan(&got); err != nil {
// Ignore errors due to cancellation.
Expand Down
18 changes: 18 additions & 0 deletions pkg/sql/alter_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgnotice"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -74,6 +75,23 @@ func (n *alterTypeNode) startExec(params runParams) error {
func (p *planner) addEnumValue(
ctx context.Context, n *alterTypeNode, node *tree.AlterTypeAddValue,
) error {
if n.desc.Kind != descpb.TypeDescriptor_ENUM {
return pgerror.Newf(pgcode.WrongObjectType, "%q is not an enum", n.desc.Name)
}
// See if the value already exists in the enum or not.
for _, member := range n.desc.EnumMembers {
if member.LogicalRepresentation == node.NewVal {
if node.IfNotExists {
p.SendClientNotice(
ctx,
pgnotice.Newf("enum label %q already exists, skipping", node.NewVal),
)
return nil
}
return pgerror.Newf(pgcode.DuplicateObject, "enum label %q already exists", node.NewVal)
}
}

if err := n.desc.AddEnumValue(node); err != nil {
return err
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -3443,10 +3443,10 @@ func (dsp *DistSQLPlanner) createPlanForExport(
}

core := execinfrapb.ProcessorCoreUnion{CSVWriter: &execinfrapb.CSVWriterSpec{
Destination: n.fileName,
NamePattern: exportFilePatternDefault,
Destination: n.destination,
NamePattern: n.fileNamePattern,
Options: n.csvOpts,
ChunkRows: int64(n.chunkSize),
ChunkRows: int64(n.chunkRows),
CompressionCodec: n.fileCompression,
}}

Expand Down
Loading

0 comments on commit 7e4e0bf

Please sign in to comment.