Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
60281: sql/pgwire: send placeholder BackendKeyData r=asubiotto,spaskob a=rafiss

fixes #13191 

Some tools expect this message to be returned at connection time and
will not connect without it. CockroachDB does not support pgwire
cancellation, but we can still send a placeholder value here, and
continue ignoring cancellation requests like we already do.

Added a small test to make sure nothing broke.

Release note (sql change): When a connection is established, CockroachDB
will now return a placeholder BackendKeyData message in the response.
This is for compatibility with some tools, but using the BackendKeyData
to cancel a query will still have no effect, just the same as before.



60429: kv: (re-)introduce a stopgap for lack of ReplicaState synchronization r=irfansharif a=irfansharif

See #59194 and #58489 for more details.

In #58489 we observed a scary lack of synchronization around how we set
the ReplicaState for a given replica, and how we mark a replica as
"initialized". What this meant is that it was possible for the entry in
Store.mu.replicas to be both "initialized" and have an empty
ReplicaState. This is now more likely to bite us given the migrations
infrastructure attempts to purge outdated replicas at start up time
(when replicas are being initialized, and we're iterating through extan
replicas in the Store.mu.replicas map).

We believed this was addressed as part of #58378, but that appears not
to be the case. Lets re-introduce this stop-gap while we investigate.

Release note: None

60441: bazel: quash unnecessary dependency on `pkg/util/uuid` from protos r=rickystewart a=rickystewart

This dependency can be replaced with a few `# keep` deps in a few choice
proto targets, which is what we should have done the whole time anyway.
This fixes build failures elsewhere in tree -- for example,
`pkg/util/uuid:uuid_test`, which doesn't play nicely with `rules_go` in
the presence of this dependency.

Fixes #59778.

Release note: None

Co-authored-by: Rafi Shamim <[email protected]>
Co-authored-by: irfan sharif <[email protected]>
Co-authored-by: Ricky Stewart <[email protected]>
  • Loading branch information
4 people committed Feb 10, 2021
4 parents ffcd641 + c749a60 + 4880248 + f43bffb commit 5095a3c
Show file tree
Hide file tree
Showing 17 changed files with 126 additions and 25 deletions.
3 changes: 3 additions & 0 deletions pkg/ccl/sqlproxyccl/authentication.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ func authenticate(clientConn, crdbConn net.Conn) error {
case *pgproto3.ParameterStatus:
// Server sent status message; keep reading messages until
// `pgproto3.ReadyForQuery` is encountered.
case *pgproto3.BackendKeyData:
// Server sent backend key data; keep reading messages until
// `pgproto3.ReadyForQuery` is encountered.
case *pgproto3.ErrorResponse:
// Server has rejected the authentication response from the client and
// has closed the connection.
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/sqlproxyccl/authentication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,11 @@ func TestAuthenticateUnexpectedMessage(t *testing.T) {
fe := pgproto3.NewFrontend(pgproto3.NewChunkReader(cli), cli)

go func() {
err := be.Send(&pgproto3.BackendKeyData{})
err := be.Send(&pgproto3.BindComplete{})
require.NoError(t, err)
beMsg, err := fe.Receive()
require.NoError(t, err)
require.Equal(t, beMsg, &pgproto3.BackendKeyData{})
require.Equal(t, beMsg, &pgproto3.BindComplete{})
}()

err := authenticate(srv, cli)
Expand Down
5 changes: 4 additions & 1 deletion pkg/ccl/utilccl/licenseccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,8 @@ go_proto_library(
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/utilccl/licenseccl",
proto = ":licenseccl_proto",
visibility = ["//visibility:public"],
deps = ["@com_github_gogo_protobuf//gogoproto"],
deps = [
"//pkg/util/uuid", # keep
"@com_github_gogo_protobuf//gogoproto",
],
)
2 changes: 0 additions & 2 deletions pkg/cmd/protoc-gen-gogoroach/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ go_proto_compiler(
valid_archive = True,
visibility = ["//visibility:public"],
deps = [
"//pkg/util/uuid",
"@com_github_gogo_protobuf//proto",
"@com_github_gogo_protobuf//sortkeys",
"@com_github_gogo_protobuf//types",
Expand All @@ -61,7 +60,6 @@ go_proto_compiler(
valid_archive = True,
visibility = ["//visibility:public"],
deps = [
"//pkg/util/uuid",
"@com_github_gogo_protobuf//proto",
"@com_github_gogo_protobuf//sortkeys",
"@com_github_gogo_protobuf//types",
Expand Down
1 change: 1 addition & 0 deletions pkg/jobs/jobspb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ go_proto_library(
"//pkg/sql/schemachanger/scpb",
"//pkg/sql/sem/tree", # keep
"//pkg/util/hlc",
"//pkg/util/uuid", # keep
"@com_github_cockroachdb_errors//errorspb",
"@com_github_gogo_protobuf//gogoproto",
],
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/kvserverpb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ go_proto_library(
"//pkg/roachpb",
"//pkg/storage/enginepb",
"//pkg/util/hlc",
"//pkg/util/uuid", # keep
"@com_github_gogo_protobuf//gogoproto",
],
)
1 change: 1 addition & 0 deletions pkg/kv/kvserver/protectedts/ptpb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ go_proto_library(
deps = [
"//pkg/roachpb",
"//pkg/util/hlc",
"//pkg/util/uuid", # keep
"@com_github_gogo_protobuf//gogoproto",
"@org_golang_google_genproto//googleapis/api/annotations:go_default_library",
],
Expand Down
5 changes: 5 additions & 0 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -809,6 +809,11 @@ func (r *Replica) GetGCThreshold() hlc.Timestamp {

// Version returns the replica version.
func (r *Replica) Version() roachpb.Version {
if r.mu.state.Version == nil {
// TODO(irfansharif,tbg): This is a stop-gap for #58523.
return roachpb.Version{}
}

r.mu.RLock()
defer r.mu.RUnlock()
return *r.mu.state.Version
Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2719,6 +2719,10 @@ func (s *Store) PurgeOutdatedReplicas(ctx context.Context, version roachpb.Versi
qp := quotapool.NewIntPool("purge-outdated-replicas", 50)
g := ctxgroup.WithContext(ctx)
s.VisitReplicas(func(repl *Replica) (wantMore bool) {
if (repl.Version() == roachpb.Version{}) {
// TODO(irfansharif,tbg): This is a stop gap for #58523.
return true
}
if !repl.Version().Less(version) {
// Nothing to do here.
return true
Expand Down
1 change: 1 addition & 0 deletions pkg/server/serverpb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ go_proto_library(
"//pkg/util",
"//pkg/util/log/logpb",
"//pkg/util/metric",
"//pkg/util/uuid", # keep
"@com_github_gogo_protobuf//gogoproto",
# NB: The grpc-gateway compiler injects a dependency on the descriptor
# package that Gazelle isn't prepared to deal with.
Expand Down
33 changes: 26 additions & 7 deletions pkg/sql/pgwire/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,10 +302,8 @@ func (c *conn) serveImpl(
dummyCh := make(chan error)
close(dummyCh)
procCh = dummyCh
// An initial readyForQuery message is part of the handshake.
c.msgBuilder.initMsg(pgwirebase.ServerMsgReady)
c.msgBuilder.writeByte(byte(sql.IdleTxnBlock))
if err := c.msgBuilder.finishMsg(c.conn); err != nil {

if err := c.sendReadyForQuery(); err != nil {
reserved.Close(ctx)
return
}
Expand Down Expand Up @@ -688,14 +686,35 @@ func (c *conn) sendInitialConnData(
if err := c.sendParamStatus("is_superuser", superUserVal); err != nil {
return sql.ConnectionHandler{}, err
}
if err := c.sendReadyForQuery(); err != nil {
return sql.ConnectionHandler{}, err
}
return connHandler, nil
}

// An initial readyForQuery message is part of the handshake.
// sendReadyForQuery sends the final messages of the connection handshake.
// This includes a placeholder BackendKeyData message and a ServerMsgReady
// message indicating that there is no active transaction.
func (c *conn) sendReadyForQuery() error {
// Send the client a dummy BackendKeyData message. This is necessary for
// compatibility with tools that require this message. This information is
// normally used by clients to send a CancelRequest message:
// https://www.postgresql.org/docs/9.6/static/protocol-flow.html#AEN112861
// CockroachDB currently ignores all CancelRequests.
c.msgBuilder.initMsg(pgwirebase.ServerMsgBackendKeyData)
c.msgBuilder.putInt32(0)
c.msgBuilder.putInt32(0)
if err := c.msgBuilder.finishMsg(c.conn); err != nil {
return err
}

// An initial ServerMsgReady message is part of the handshake.
c.msgBuilder.initMsg(pgwirebase.ServerMsgReady)
c.msgBuilder.writeByte(byte(sql.IdleTxnBlock))
if err := c.msgBuilder.finishMsg(c.conn); err != nil {
return sql.ConnectionHandler{}, err
return err
}
return connHandler, nil
return nil
}

// An error is returned iff the statement buffer has been closed. In that case,
Expand Down
44 changes: 44 additions & 0 deletions pkg/sql/pgwire/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1521,3 +1521,47 @@ func TestSetSessionArguments(t *testing.T) {
t.Fatal(err)
}
}

// TestCancelQuery uses the pgwire-level query cancellation protocol provided
// by lib/pq to make sure that canceling a query has no effect, and makes sure
// the dummy BackendKeyData does not cause problems.
func TestCancelQuery(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

cancelCtx, cancel := context.WithCancel(context.Background())
args := base.TestServerArgs{
Knobs: base.TestingKnobs{
SQLExecutor: &sql.ExecutorTestingKnobs{
BeforeExecute: func(ctx context.Context, stmt string) {
if strings.Contains(stmt, "pg_sleep") {
cancel()
}
},
},
},
}
s, _, _ := serverutils.StartServer(t, args)
defer s.Stopper().Stop(cancelCtx)

pgURL, cleanupFunc := sqlutils.PGUrl(
t, s.ServingSQLAddr(), "TestCancelQuery" /* prefix */, url.User(security.RootUser),
)
defer cleanupFunc()

db, err := gosql.Open("postgres", pgURL.String())
require.NoError(t, err)
defer db.Close()

// Cancellation has no effect on ongoing query.
if _, err := db.QueryContext(cancelCtx, "select pg_sleep(0)"); err != nil {
t.Fatalf("unexpected error: %s", err)
}

// Context is already canceled, so error should come before execution.
if _, err := db.QueryContext(cancelCtx, "select 1"); err == nil {
t.Fatal("expected error")
} else if err.Error() != "context canceled" {
t.Fatalf("unexpected error: %s", err)
}
}
1 change: 1 addition & 0 deletions pkg/sql/pgwire/pgwirebase/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ const (
ClientMsgTerminate ClientMessageType = 'X'

ServerMsgAuth ServerMessageType = 'R'
ServerMsgBackendKeyData ServerMessageType = 'K'
ServerMsgBindComplete ServerMessageType = '2'
ServerMsgCommandComplete ServerMessageType = 'C'
ServerMsgCloseComplete ServerMessageType = '3'
Expand Down
28 changes: 16 additions & 12 deletions pkg/sql/pgwire/pgwirebase/servermessagetype_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/storage/enginepb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ go_proto_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/util/hlc",
"//pkg/util/uuid", # keep
"@com_github_gogo_protobuf//gogoproto",
],
)
Expand Down
12 changes: 12 additions & 0 deletions pkg/testutils/pgtest/pgtest.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,22 @@ func NewPGTest(ctx context.Context, addr, user string) (*PGTest, error) {
}
msgs, err := p.Until(false /* keepErrMsg */, &pgproto3.ReadyForQuery{})
foundCrdb := false
var backendKeyData *pgproto3.BackendKeyData
for _, msg := range msgs {
if s, ok := msg.(*pgproto3.ParameterStatus); ok && s.Name == "crdb_version" {
foundCrdb = true
}
if d, ok := msg.(*pgproto3.BackendKeyData); ok {
// We inspect the BackendKeyData outside of the loop since we only
// want to do the assertions if foundCrdb==true.
backendKeyData = d
}
}
if backendKeyData == nil {
return nil, errors.Errorf("did not receive BackendKeyData")
}
if foundCrdb && (backendKeyData.ProcessID != 0 || backendKeyData.SecretKey != 0) {
return nil, errors.Errorf("unexpected BackendKeyData: %+v", d)
}
p.isCockroachDB = foundCrdb
success = err == nil
Expand Down
5 changes: 4 additions & 1 deletion pkg/util/protoutil/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -51,5 +51,8 @@ go_proto_library(
importpath = "github.com/cockroachdb/cockroach/pkg/util/protoutil",
proto = ":protoutil_proto",
visibility = ["//visibility:public"],
deps = ["@com_github_gogo_protobuf//gogoproto"],
deps = [
"//pkg/util/uuid", # keep
"@com_github_gogo_protobuf//gogoproto",
],
)

0 comments on commit 5095a3c

Please sign in to comment.