Skip to content

Commit

Permalink
Merge #74112 #74145
Browse files Browse the repository at this point in the history
74112: ui/server: add ability to create conditional statement diagnostics r=lindseyjin a=lindseyjin

Resolves #57634

Previously, we did not have the ability to create conditional statement
diagnostics in the frontend. This commit adds in the ability to specify
a minimum execution latency and an expiry time when creating a statement
diagnostics request. These changes apply to both DB and CC console.

Since expired requests are not surfaced at all in the frontend, we have
also modified the statement diagnostics API response to not return
already expired and incomplete requests.

Lastly, this commit also deletes some unused code related to statement
diagnostics modals.

Release note (ui change): Add ability to create conditional statement
diagnostics by adding two new fields 1) minimum execution latency, which
specifies the limit for when a statement should be tracked and 2) expiry
time, which specifies when a diagnostics request should expire.

74145: streamingccl: Ensure appropriate privileges when replicating. r=miretskiy a=miretskiy

Add checks to replication stream manager to ensure appropriate privileges
and require enterprise license when executing streaming replication.

Release Notes: None

Co-authored-by: Lindsey Jin <[email protected]>
Co-authored-by: Yevgeniy Miretskiy <[email protected]>
  • Loading branch information
3 people committed Dec 23, 2021
3 parents 5d15480 + 6c1b611 + 9cb4298 commit 5455dd3
Show file tree
Hide file tree
Showing 30 changed files with 670 additions and 362 deletions.
4 changes: 4 additions & 0 deletions pkg/ccl/streamingccl/streamproducer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ go_test(
srcs = [
"main_test.go",
"producer_job_test.go",
"replication_manager_test.go",
"replication_stream_test.go",
],
embed = [":streamproducer"],
Expand All @@ -76,8 +77,11 @@ go_test(
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/settings/cluster",
"//pkg/sql",
"//pkg/sql/catalog/catalogkv",
"//pkg/sql/distsql",
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondatapb",
"//pkg/streaming",
"//pkg/testutils",
"//pkg/testutils/serverutils",
Expand Down
40 changes: 33 additions & 7 deletions pkg/ccl/streamingccl/streamproducer/replication_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,12 @@
package streamproducer

import (
"github.com/cockroachdb/cockroach/pkg/ccl/utilccl"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/streaming"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand All @@ -19,7 +23,7 @@ import (
type replicationStreamManagerImpl struct{}

// CompleteStreamIngestion implements ReplicationStreamManager interface.
func (r replicationStreamManagerImpl) CompleteStreamIngestion(
func (r *replicationStreamManagerImpl) CompleteStreamIngestion(
evalCtx *tree.EvalContext,
txn *kv.Txn,
streamID streaming.StreamID,
Expand All @@ -29,14 +33,14 @@ func (r replicationStreamManagerImpl) CompleteStreamIngestion(
}

// StartReplicationStream implements ReplicationStreamManager interface.
func (r replicationStreamManagerImpl) StartReplicationStream(
func (r *replicationStreamManagerImpl) StartReplicationStream(
evalCtx *tree.EvalContext, txn *kv.Txn, tenantID uint64,
) (streaming.StreamID, error) {
return startReplicationStreamJob(evalCtx, txn, tenantID)
}

// UpdateReplicationStreamProgress implements ReplicationStreamManager interface.
func (r replicationStreamManagerImpl) UpdateReplicationStreamProgress(
func (r *replicationStreamManagerImpl) UpdateReplicationStreamProgress(
evalCtx *tree.EvalContext, streamID streaming.StreamID, frontier hlc.Timestamp, txn *kv.Txn,
) (jobspb.StreamReplicationStatus, error) {
return heartbeatReplicationStream(evalCtx, streamID, frontier, txn)
Expand All @@ -45,14 +49,36 @@ func (r replicationStreamManagerImpl) UpdateReplicationStreamProgress(
// StreamPartition returns a value generator which yields events for the specified partition.
// opaqueSpec contains streampb.PartitionSpec protocol message.
// streamID specifies the streaming job this partition belongs too.
func (r replicationStreamManagerImpl) StreamPartition(
func (r *replicationStreamManagerImpl) StreamPartition(
evalCtx *tree.EvalContext, streamID streaming.StreamID, opaqueSpec []byte,
) (tree.ValueGenerator, error) {
return streamPartition(evalCtx, streamID, opaqueSpec)
}

func init() {
streaming.GetReplicationStreamManagerHook = func() (streaming.ReplicationStreamManager, error) {
return &replicationStreamManagerImpl{}, nil
func newReplicationStreamManagerWithPrivilegesCheck(
evalCtx *tree.EvalContext,
) (streaming.ReplicationStreamManager, error) {
isAdmin, err := evalCtx.SessionAccessor.HasAdminRole(evalCtx.Context)
if err != nil {
return nil, err
}

if !isAdmin {
return nil,
pgerror.New(pgcode.InsufficientPrivilege, "replication restricted to ADMIN role")
}

execCfg := evalCtx.Planner.ExecutorConfig().(*sql.ExecutorConfig)
enterpriseCheckErr := utilccl.CheckEnterpriseEnabled(
execCfg.Settings, execCfg.ClusterID(), execCfg.Organization(), "REPLICATION")
if enterpriseCheckErr != nil {
return nil, pgerror.Wrap(enterpriseCheckErr,
pgcode.InsufficientPrivilege, "replication requires enterprise license")
}

return &replicationStreamManagerImpl{}, nil
}

func init() {
streaming.GetReplicationStreamManagerHook = newReplicationStreamManagerWithPrivilegesCheck
}
88 changes: 88 additions & 0 deletions pkg/ccl/streamingccl/streamproducer/replication_manager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright 2021 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package streamproducer

import (
"context"
"fmt"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/ccl/utilccl"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
"github.com/cockroachdb/cockroach/pkg/streaming"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/stretchr/testify/require"
)

func TestReplicationManagerRequiresAdminRole(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
s, sqlDB, kvDB := serverutils.StartServer(t, base.TestServerArgs{})
defer s.Stopper().Stop(ctx)
tDB := sqlutils.MakeSQLRunner(sqlDB)

var sessionData sessiondatapb.SessionData
{
var sessionSerialized []byte
tDB.QueryRow(t, "SELECT crdb_internal.serialize_session()").Scan(&sessionSerialized)
require.NoError(t, protoutil.Unmarshal(sessionSerialized, &sessionData))
}

getManagerForUser := func(u string) (streaming.ReplicationStreamManager, error) {
sqlUser, err := security.MakeSQLUsernameFromUserInput(u, security.UsernameValidation)
require.NoError(t, err)
execCfg := s.ExecutorConfig().(sql.ExecutorConfig)
txn := kvDB.NewTxn(ctx, "test")
p, cleanup := sql.NewInternalPlanner("test", txn, sqlUser, &sql.MemoryMetrics{}, &execCfg, sessionData)
defer cleanup()
ec := p.(interface{ EvalContext() *tree.EvalContext }).EvalContext()
return newReplicationStreamManagerWithPrivilegesCheck(ec)
}

for _, tc := range []struct {
user string
expErr string
isEnterprise bool
}{
{user: "admin", expErr: "", isEnterprise: true},
{user: "root", expErr: "", isEnterprise: true},
{user: "nobody", expErr: "replication restricted to ADMIN role", isEnterprise: true},
{user: "admin", expErr: "use of REPLICATION requires an enterprise license", isEnterprise: false},
{user: "root", expErr: "use of REPLICATION requires an enterprise license", isEnterprise: false},
{user: "nobody", expErr: "replication restricted to ADMIN role", isEnterprise: false},
} {
t.Run(fmt.Sprintf("%s/ent=%t", tc.user, tc.isEnterprise), func(t *testing.T) {
if tc.isEnterprise {
defer utilccl.TestingEnableEnterprise()()
} else {
defer utilccl.TestingDisableEnterprise()()
}

m, err := getManagerForUser(tc.user)
if tc.expErr == "" {
require.NoError(t, err)
require.NotNil(t, m)
} else {
require.Regexp(t, tc.expErr, err)
require.Nil(t, m)
}
})
}

}
12 changes: 9 additions & 3 deletions pkg/server/statement_diagnostics_requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -115,10 +116,11 @@ func (s *statusServer) CancelStatementDiagnosticsReport(
return &response, nil
}

// StatementDiagnosticsRequests retrieves all of the statement
// diagnostics requests in the `system.statement_diagnostics_requests` table.
// StatementDiagnosticsRequests retrieves all statement diagnostics
// requests in the `system.statement_diagnostics_requests` table that
// have not yet expired.
func (s *statusServer) StatementDiagnosticsRequests(
ctx context.Context, req *serverpb.StatementDiagnosticsReportsRequest,
ctx context.Context, _ *serverpb.StatementDiagnosticsReportsRequest,
) (*serverpb.StatementDiagnosticsReportsResponse, error) {
ctx = propagateGatewayMetadata(ctx)
ctx = s.AnnotateCtx(ctx)
Expand Down Expand Up @@ -179,6 +181,10 @@ func (s *statusServer) StatementDiagnosticsRequests(
}
if expiresAt, ok := row[6].(*tree.DTimestampTZ); ok {
req.ExpiresAt = expiresAt.Time
// Don't return already expired requests.
if req.ExpiresAt.Before(timeutil.Now()) {
continue
}
}
}

Expand Down
44 changes: 44 additions & 0 deletions pkg/server/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2446,6 +2446,50 @@ func TestStatementDiagnosticsCompleted(t *testing.T) {
}
}

func TestStatementDiagnosticsDoesNotReturnExpiredRequests(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{})
defer s.Stopper().Stop(context.Background())
db := sqlutils.MakeSQLRunner(sqlDB)

statementFingerprint := "INSERT INTO test VALUES (_)"
expiresAfter := 5 * time.Millisecond

// Create statement diagnostics request with defined expiry time.
req := &serverpb.CreateStatementDiagnosticsReportRequest{
StatementFingerprint: statementFingerprint,
MinExecutionLatency: 500 * time.Millisecond,
ExpiresAfter: expiresAfter,
}
var resp serverpb.CreateStatementDiagnosticsReportResponse
if err := postStatusJSONProto(s, "stmtdiagreports", req, &resp); err != nil {
t.Fatal(err)
}

// Wait for request to expire.
time.Sleep(expiresAfter)

// Check that created statement diagnostics report is incomplete.
report := db.QueryStr(t, `
SELECT completed
FROM system.statement_diagnostics_requests
WHERE statement_fingerprint = $1`, statementFingerprint)

require.Equal(t, report[0][0], "false")

// Check that expired report is not returned in API response.
var respGet serverpb.StatementDiagnosticsReportsResponse
if err := getStatusJSONProto(s, "stmtdiagreports", &respGet); err != nil {
t.Fatal(err)
}

for _, report := range respGet.Reports {
require.NotEqual(t, report.StatementFingerprint, statementFingerprint)
}
}

func TestJobStatusResponse(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
12 changes: 6 additions & 6 deletions pkg/sql/sem/builtins/replication_builtins.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ var replicationBuiltins = map[string]builtinDefinition{
},
ReturnType: tree.FixedReturnType(types.Int),
Fn: func(evalCtx *tree.EvalContext, args tree.Datums) (tree.Datum, error) {
mgr, err := streaming.GetReplicationStreamManager()
mgr, err := streaming.GetReplicationStreamManager(evalCtx)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -80,7 +80,7 @@ var replicationBuiltins = map[string]builtinDefinition{
},
ReturnType: tree.FixedReturnType(types.Int),
Fn: func(evalCtx *tree.EvalContext, args tree.Datums) (tree.Datum, error) {
mgr, err := streaming.GetReplicationStreamManager()
mgr, err := streaming.GetReplicationStreamManager(evalCtx)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -114,7 +114,7 @@ var replicationBuiltins = map[string]builtinDefinition{
},
ReturnType: tree.FixedReturnType(types.String),
Fn: func(evalCtx *tree.EvalContext, args tree.Datums) (tree.Datum, error) {
mgr, err := streaming.GetReplicationStreamManager()
mgr, err := streaming.GetReplicationStreamManager(evalCtx)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -150,13 +150,13 @@ var replicationBuiltins = map[string]builtinDefinition{
[]*types.T{types.Bytes},
[]string{"stream_event"},
),
func(ctx *tree.EvalContext, args tree.Datums) (tree.ValueGenerator, error) {
mgr, err := streaming.GetReplicationStreamManager()
func(evalCtx *tree.EvalContext, args tree.Datums) (tree.ValueGenerator, error) {
mgr, err := streaming.GetReplicationStreamManager(evalCtx)
if err != nil {
return nil, err
}
return mgr.StreamPartition(
ctx,
evalCtx,
streaming.StreamID(tree.MustBeDInt(args[0])),
[]byte(tree.MustBeDBytes(args[1])),
)
Expand Down
9 changes: 5 additions & 4 deletions pkg/streaming/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ func (j StreamID) SafeValue() {}
// InvalidStreamID is the zero value for StreamID corresponding to no stream.
const InvalidStreamID StreamID = 0

// GetReplicationStreamManagerHook is the hook to get a collection of APIs that streaming replication supports.
var GetReplicationStreamManagerHook func() (ReplicationStreamManager, error)
// GetReplicationStreamManagerHook is the hook to get access to the replication API.
// Used by builtin functions to trigger streaming replication.
var GetReplicationStreamManagerHook func(evalCtx *tree.EvalContext) (ReplicationStreamManager, error)

// ReplicationStreamManager represents a collection of APIs that streaming replication supports.
type ReplicationStreamManager interface {
Expand Down Expand Up @@ -65,9 +66,9 @@ type ReplicationStreamManager interface {
}

// GetReplicationStreamManager returns a ReplicationStreamManager if a CCL binary is loaded.
func GetReplicationStreamManager() (ReplicationStreamManager, error) {
func GetReplicationStreamManager(evalCtx *tree.EvalContext) (ReplicationStreamManager, error) {
if GetReplicationStreamManagerHook == nil {
return nil, errors.New("replication streaming requires a CCL binary")
}
return GetReplicationStreamManagerHook()
return GetReplicationStreamManagerHook(evalCtx)
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { fetchData } from "src/api";
const STATEMENT_DIAGNOSTICS_PATH = "/_status/stmtdiagreports";
const CREATE_STATEMENT_DIAGNOSTICS_REPORT_PATH = "/_status/stmtdiagreports";

type CreateStatementDiagnosticsReportRequestMessage = cockroach.server.serverpb.CreateStatementDiagnosticsReportRequest;
type CreateStatementDiagnosticsReportResponseMessage = cockroach.server.serverpb.CreateStatementDiagnosticsReportResponse;

export function getStatementDiagnosticsReports(): Promise<
Expand All @@ -26,14 +27,12 @@ export function getStatementDiagnosticsReports(): Promise<
}

export function createStatementDiagnosticsReport(
statementsFingerprint: string,
req: CreateStatementDiagnosticsReportRequestMessage,
): Promise<CreateStatementDiagnosticsReportResponseMessage> {
return fetchData(
cockroach.server.serverpb.CreateStatementDiagnosticsReportResponse,
CREATE_STATEMENT_DIAGNOSTICS_REPORT_PATH,
cockroach.server.serverpb.CreateStatementDiagnosticsReportRequest,
{
statement_fingerprint: statementsFingerprint,
},
req,
);
}
4 changes: 3 additions & 1 deletion pkg/ui/workspaces/cluster-ui/src/modal/modal.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ export interface ModalProps {
okText?: string;
cancelText?: string;
visible: boolean;
className?: string;
}

const cx = classNames.bind(styles);
Expand All @@ -34,11 +35,12 @@ export const Modal: React.FC<ModalProps> = ({
cancelText,
visible,
title,
className,
}) => {
return (
<AntModal
title={title && <Text textType={TextTypes.Heading3}>{title}</Text>}
className={cx("crl-modal")}
className={cx("crl-modal", className)}
visible={visible}
closeIcon={
<div className={cx("crl-modal__close-icon")} onClick={onCancel}>
Expand Down
Loading

0 comments on commit 5455dd3

Please sign in to comment.