Skip to content

Commit

Permalink
Merge #106131
Browse files Browse the repository at this point in the history
106131: pgrepl: parse replication statements + handle IDENTIFY_SYSTEM r=cucaroach a=otan

Informs: #105130

## pgwire: ban extended protocol with REPLICATION protocol

In line with PostgreSQL, we prohibit the use of the extended protocol
when the replication protocol is in effect.

Release note: None

## pgrepl: implement IDENTIFY_SYSTEM command

This commit implements IDENTIFY_SYSTEM in the replication protocol,
which is used to retrieve metadata about replication state.

Most of this commit involves changing assumptions throughout the
codebase there is 1 parser - there is 2 if replication mode is enabled.
We've also had to change the parser to return `statements.Statement`
instead of `pgrepltree.ReplicationStatement` for compatibility with
`parser.Parse...`.

I've left the `LSN` fields as placeholders for now as we finalise on
what we'll eventually call it.

Release note: None

## sql: no-op SHOW SYNTAX in replication mode

Currently on the CLI, the command must parse on the server for it to
run. For replication related commands, we have not implemented `SHOW
SYNTAX` yet. This is a bit of effort and seems outside the scope of what
we want for now.

For now, handle replication commands by no-oping them.

Release note: None



Co-authored-by: Oliver Tan <[email protected]>
  • Loading branch information
craig[bot] and otan committed Aug 8, 2023
2 parents e5d064d + 1cd0a2a commit 8e614fc
Show file tree
Hide file tree
Showing 29 changed files with 488 additions and 15 deletions.
1 change: 1 addition & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -1902,6 +1902,7 @@ GO_TARGETS = [
"//pkg/sql/parser:parser",
"//pkg/sql/parser:parser_test",
"//pkg/sql/pgrepl/lsn:lsn",
"//pkg/sql/pgrepl/lsnutil:lsnutil",
"//pkg/sql/pgrepl/pgreplparser:pgreplparser",
"//pkg/sql/pgrepl/pgreplparser:pgreplparser_test",
"//pkg/sql/pgrepl/pgrepltree:pgrepltree",
Expand Down
7 changes: 7 additions & 0 deletions pkg/acceptance/generated_cli_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 20 additions & 0 deletions pkg/cli/interactive_tests/test_replication_protocol.tcl
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#! /usr/bin/env expect -f

source [file join [file dirname $argv0] common.tcl]

start_server $argv

start_test "Ensure that replication mode works as expected in the sql shell"

# Spawn a sql shell.
spawn /bin/bash

send "$argv sql --url `cat server_url`'\&replication=database' -e 'IDENTIFY_SYSTEM'\r"
eexpect "(1 row)"

send_eof
eexpect eof

end_test

stop_server $argv
4 changes: 4 additions & 0 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ go_library(
"grant_revoke_system.go",
"grant_role.go",
"group.go",
"identify_system.go",
"index_backfiller.go",
"index_join.go",
"index_split_scatter.go",
Expand Down Expand Up @@ -442,6 +443,9 @@ go_library(
"//pkg/sql/paramparse",
"//pkg/sql/parser",
"//pkg/sql/parser/statements",
"//pkg/sql/pgrepl/lsn",
"//pkg/sql/pgrepl/lsnutil",
"//pkg/sql/pgrepl/pgrepltree",
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/pgwire/pgerror",
"//pkg/sql/pgwire/pgnotice",
Expand Down
8 changes: 8 additions & 0 deletions pkg/sql/catalog/colinfo/result_columns.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,3 +344,11 @@ const RangesExtraRenders = `
coalesce((crdb_internal.range_stats(start_key)->>'range_key_bytes')::INT, 0) +
coalesce((crdb_internal.range_stats(start_key)->>'range_val_bytes')::INT, 0) AS range_size
`

// IdentifySystemColumns is the schema for IDENTIFY_SYSTEM.
var IdentifySystemColumns = ResultColumns{
{Name: "systemid", Typ: types.String},
{Name: "timeline", Typ: types.Int4},
{Name: "xlogpos", Typ: types.String},
{Name: "dbname", Typ: types.String},
}
2 changes: 2 additions & 0 deletions pkg/sql/delegate/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,15 @@ go_library(
"//pkg/sql/oidext",
"//pkg/sql/opt/cat",
"//pkg/sql/parser",
"//pkg/sql/pgrepl/pgreplparser",
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/pgwire/pgerror",
"//pkg/sql/privilege",
"//pkg/sql/roleoption",
"//pkg/sql/sem/catconstants",
"//pkg/sql/sem/eval",
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondatapb",
"//pkg/sql/sqlerrors",
"//pkg/sql/sqltelemetry",
"//pkg/sql/syntheticprivilege",
Expand Down
12 changes: 12 additions & 0 deletions pkg/sql/delegate/show_syntax.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
"github.com/cockroachdb/cockroach/pkg/sql/lexbase"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/pgrepl/pgreplparser"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
)

// delegateShowSyntax implements SHOW SYNTAX. This statement is usually handled
Expand All @@ -41,6 +43,16 @@ func (d *delegator) delegateShowSyntax(n *tree.ShowSyntax) (tree.Statement, erro
colinfo.ShowSyntaxColumns[0].Name, colinfo.ShowSyntaxColumns[1].Name,
)

// For replication based statements, return nothing for now.
if d.evalCtx.SessionData().ReplicationMode != sessiondatapb.ReplicationMode_REPLICATION_MODE_DISABLED &&
pgreplparser.IsReplicationProtocolCommand(n.Statement) {
return d.parse(fmt.Sprintf(
`SELECT '' AS %s, '' AS %s FROM generate_series(0, -1) x`,
colinfo.ShowSyntaxColumns[0].Name,
colinfo.ShowSyntaxColumns[1].Name,
))
}

comma := ""
// TODO(knz): in the call below, reportErr is nil although we might
// want to be able to capture (and report) these errors as well.
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -2152,6 +2152,7 @@ type SessionArgs struct {
User username.SQLUsername
IsSuperuser bool
IsSSL bool
ReplicationMode sessiondatapb.ReplicationMode
SystemIdentity username.SQLUsername
SessionDefaults SessionDefaults
CustomOptionSessionDefaults SessionDefaults
Expand Down
66 changes: 66 additions & 0 deletions pkg/sql/identify_system.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright 2023 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package sql

import (
"context"

"github.com/cockroachdb/cockroach/pkg/sql/pgrepl/lsn"
"github.com/cockroachdb/cockroach/pkg/sql/pgrepl/lsnutil"
"github.com/cockroachdb/cockroach/pkg/sql/pgrepl/pgrepltree"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
)

type identifySystemNode struct {
optColumnsSlot
clusterID string
database string
lsn lsn.LSN
shown bool
}

func (s *identifySystemNode) startExec(params runParams) error {
return nil
}

func (s *identifySystemNode) Next(params runParams) (bool, error) {
if s.shown {
return false, nil
}
s.shown = true
return true, nil
}

func (s *identifySystemNode) Values() tree.Datums {
db := tree.DNull
if s.database != "" {
db = tree.NewDString(s.database)
}
return tree.Datums{
tree.NewDString(s.clusterID),
tree.NewDInt(1), // timeline
tree.NewDString(s.lsn.String()),
db,
}
}

func (s *identifySystemNode) Close(ctx context.Context) {}

func (p *planner) IdentifySystem(
ctx context.Context, n *pgrepltree.IdentifySystem,
) (planNode, error) {
return &identifySystemNode{
// TODO(#105130): correctly populate this field.
lsn: lsnutil.HLCToLSN(p.Txn().ReadTimestamp()),
clusterID: p.ExecCfg().NodeInfo.LogicalClusterID().String(),
database: p.SessionData().Database,
}, nil
}
5 changes: 5 additions & 0 deletions pkg/sql/opaque.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
"github.com/cockroachdb/cockroach/pkg/sql/opt"
"github.com/cockroachdb/cockroach/pkg/sql/opt/optbuilder"
"github.com/cockroachdb/cockroach/pkg/sql/pgrepl/pgrepltree"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
Expand Down Expand Up @@ -281,6 +282,8 @@ func planOpaque(ctx context.Context, p *planner, stmt tree.Statement) (planNode,
return p.Truncate(ctx, n)
case *tree.Unlisten:
return p.Unlisten(ctx, n)
case *pgrepltree.IdentifySystem:
return p.IdentifySystem(ctx, n)
case tree.CCLOnlyStatement:
plan, err := p.maybePlanHook(ctx, stmt)
if plan == nil && err == nil {
Expand Down Expand Up @@ -397,6 +400,8 @@ func init() {
&tree.Truncate{},
&tree.Unlisten{},

&pgrepltree.IdentifySystem{},

// CCL statements (without Export which has an optimizer operator).
&tree.AlterBackup{},
&tree.AlterBackupSchedule{},
Expand Down
6 changes: 6 additions & 0 deletions pkg/sql/pgrepl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ go_test(
name = "pgrepl_test",
srcs = [
"connect_test.go",
"extended_protocol_test.go",
"main_test.go",
],
args = ["-test.timeout=295s"],
data = glob(["testdata/**"]),
deps = [
"//pkg/base",
"//pkg/ccl",
Expand All @@ -15,13 +17,17 @@ go_test(
"//pkg/security/username",
"//pkg/server",
"//pkg/sql/pgwire/pgcode",
"//pkg/testutils/datapathutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/sqlutils",
"//pkg/util/leaktest",
"//pkg/util/log",
"@com_github_cockroachdb_datadriven//:datadriven",
"@com_github_cockroachdb_errors//:errors",
"@com_github_jackc_pgx_v4//:pgx",
"@com_github_jackc_pgx_v5//:pgx",
"@com_github_jackc_pgx_v5//pgconn",
"@com_github_jackc_pgx_v5//pgproto3",
"@com_github_stretchr_testify//require",
],
)
2 changes: 1 addition & 1 deletion pkg/sql/pgrepl/connect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func TestReplicationConnect(t *testing.T) {
}
require.NoError(t, err)
var val string
require.NoError(t, conn.QueryRow(ctx, "SELECT current_setting('replication')").Scan(&val))
require.NoError(t, conn.QueryRow(ctx, "SELECT current_setting('replication')", pgx.QueryExecModeSimpleProtocol).Scan(&val))
require.Equal(t, tc.expectedSessionVar, val)
})
}
Expand Down
103 changes: 103 additions & 0 deletions pkg/sql/pgrepl/extended_protocol_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// Copyright 2023 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package pgrepl

import (
"context"
"net/url"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgproto3"
"github.com/stretchr/testify/require"
)

// TestExtendedProtocolDisabled ensures the extended protocol is disabled
// during replication mode.
func TestExtendedProtocolDisabled(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

srv, db, _ := serverutils.StartServer(t, base.TestServerArgs{})
defer srv.Stopper().Stop(context.Background())
s := srv.ApplicationLayer()

sqlDB := sqlutils.MakeSQLRunner(db)
sqlDB.Exec(t, `CREATE USER testuser LOGIN REPLICATION`)

pgURL, cleanup := sqlutils.PGUrl(t, s.AdvSQLAddr(), "pgrepl_extended_protocol_test", url.User(username.TestUser))
defer cleanup()

cfg, err := pgconn.ParseConfig(pgURL.String())
require.NoError(t, err)
cfg.RuntimeParams["replication"] = "database"
ctx := context.Background()

conn, err := pgconn.ConnectConfig(ctx, cfg)
require.NoError(t, err)
fe := conn.Frontend()

for _, tc := range []struct {
desc string
msg []pgproto3.FrontendMessage
}{
{desc: "parse", msg: []pgproto3.FrontendMessage{&pgproto3.Parse{Name: "a", Query: "SELECT 1"}}},
{desc: "bind", msg: []pgproto3.FrontendMessage{&pgproto3.Bind{}}},
{desc: "parse and bind", msg: []pgproto3.FrontendMessage{
&pgproto3.Parse{Name: "a", Query: "SELECT 1"},
&pgproto3.Bind{},
}},
{desc: "describe", msg: []pgproto3.FrontendMessage{&pgproto3.Describe{Name: "a"}}},
{desc: "exec", msg: []pgproto3.FrontendMessage{&pgproto3.Execute{Portal: "a"}}},
{desc: "close", msg: []pgproto3.FrontendMessage{&pgproto3.Close{}}},
} {
t.Run(tc.desc, func(t *testing.T) {
for _, msg := range tc.msg {
fe.Send(msg)
}
fe.Send(&pgproto3.Sync{})
err := fe.Flush()
require.NoError(t, err)
var pgErr *pgconn.PgError
done := false
for !done {
recv, err := fe.Receive()
require.NoError(t, err)
switch recv := recv.(type) {
case *pgproto3.ReadyForQuery:
done = true
case *pgproto3.ErrorResponse:
// Ensure we do not have multiple errors.
require.Nil(t, pgErr)
pgErr = pgconn.ErrorResponseToPgError(recv)
default:
t.Errorf("received unexpected message %#v", recv)
}
}
require.NotNil(t, pgErr)
require.Equal(t, pgcode.ProtocolViolation.String(), pgErr.Code)
require.Contains(t, pgErr.Message, "extended query protocol not supported in a replication connection")

// Ensure we can use the connection using the simple protocol.
rows := conn.Exec(ctx, "SELECT 1")
_, err = rows.ReadAll()
require.NoError(t, err)
require.NoError(t, rows.Close())
})
}
}
12 changes: 12 additions & 0 deletions pkg/sql/pgrepl/lsnutil/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "lsnutil",
srcs = ["lsnutil.go"],
importpath = "github.com/cockroachdb/cockroach/pkg/sql/pgrepl/lsnutil",
visibility = ["//visibility:public"],
deps = [
"//pkg/sql/pgrepl/lsn",
"//pkg/util/hlc",
],
)
25 changes: 25 additions & 0 deletions pkg/sql/pgrepl/lsnutil/lsnutil.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright 2023 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package lsnutil

import (
"time"

"github.com/cockroachdb/cockroach/pkg/sql/pgrepl/lsn"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
)

// HLCToLSN converts a HLC to a LSN.
// It is in a separate package to prevent the `lsn` package importing `log`.
func HLCToLSN(h hlc.Timestamp) lsn.LSN {
// TODO(#105130): correctly populate this field.
return lsn.LSN(h.WallTime/int64(time.Millisecond)) << 32
}
Loading

0 comments on commit 8e614fc

Please sign in to comment.