Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
107394: cmd/roachtest: add disagg-rebalance roachtest r=renatolabs a=itsbilal

This test adds a roachtest that spins up a cluster with 3 nodes using S3 as the --experimental-shared-storage, and then adds a fourth node after loading a tpcc fixture and with a foreground workload running on it. It confirms the fourth node gets hydrated without transferring all live bytes over the wire.

Epic: none
Fixes: #103030

Release note: None

108154: kvcoord: refactor ambiguous commit tests r=AlexTalks a=AlexTalks

In #107323, testing for the ambiguous write case that leads to the "transaction unexpectedly committed" bug were introduced, however to increase test coverage of the fix, multiple schedules of operations need to be tested. This change simply refactors the framework of the existing test in order to enable the addition of muliple subtests. The subtests are included in a separate patch.

Part of: #103817

Release note: None

108819: roachtest: add a c2c cutover `TO LATEST` test r=lidorcarmel a=lidorcarmel

We only have c2c roachtests that cutover to the past, adding one that does a cutover to LATEST. Using the `TO LATEST` sql because we expect that to be used more in production.

Epic: none

Release note: None

108910: streamingccl: minor log updates and code reorg r=lidorcarmel a=stevendanna

See individual commits.

Epic: none

108914: sqlproxyccl: do not report BackendDown metrics on throttle and routing errors r=JeffSwenson,andy-kimball a=jaylim-crl

#### sqlproxyccl: do not report BackendDown metrics on throttle and routing errors

Previously, we were reporting the backend_down metric on the following errors:
- codeProxyRefusedConnection
- codeParamsRoutingFailed
- codeUnavailable

These errors do not imply that the backend is down. We originally introduced
this in #57431, but looking at the PR, it appears unintentional. This commit
fixes that by not reporting the backend_down metric when the proxy returns
such errors.

Release note: None

Epic: none

#### sqlproxyccl: rename codeBackendDown to codeBackendDialFailed

This commit renames codeBackendDown to codeBackendDialFailed to prevent
confusions by developers. Note that we don't rename the metric here to avoid
breaking downstream consumers. At the same time, we will remove the old
codeBackendRefusedTLS code as it does not serve any purpose, and there wasn't
a metric for it as well.

Release note: None

Epic: none



Release justification: This fixes accuracy issues with SQL Proxy metrics.

108920: util/log: add custom crash tags to sentry r=dhartunian a=pjtatlow

In #106786 we added the ability to provide an environment variable that was meant to add custom tags to sentry crash reports. That change added the function that would create the map of crash report tags / values, but it was never actually used. This change ensures that tags from that environment variable will actually show up in the sentry reports.

Release note: None

Epic: None

Co-authored-by: Bilal Akhtar <[email protected]>
Co-authored-by: Alex Sarkesian <[email protected]>
Co-authored-by: Lidor Carmel <[email protected]>
Co-authored-by: Steven Danna <[email protected]>
Co-authored-by: Jay <[email protected]>
Co-authored-by: PJ Tatlow <[email protected]>
  • Loading branch information
7 people committed Aug 17, 2023
7 parents 02065f5 + 97f17ff + 5696b4e + 331cd87 + 97b47e5 + 95eb34d + 595376f commit 07266c3
Show file tree
Hide file tree
Showing 19 changed files with 604 additions and 427 deletions.
26 changes: 7 additions & 19 deletions pkg/ccl/sqlproxyccl/backend_dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ var BackendDial = func(
if err != nil {
return nil, withCode(
errors.Wrap(err, "unable to reach backend SQL server"),
codeBackendDown)
codeBackendDialFailed)
}

// Try to upgrade the PG connection to use SSL.
Expand All @@ -46,44 +46,32 @@ var BackendDial = func(
if tlsConfig != nil {
// Send SSLRequest.
if err := binary.Write(conn, binary.BigEndian, pgSSLRequest); err != nil {
return withCode(
errors.Wrap(err, "sending SSLRequest to target server"),
codeBackendDown)
return errors.Wrap(err, "sending SSLRequest to target server")
}
response := make([]byte, 1)
if _, err = io.ReadFull(conn, response); err != nil {
return withCode(
errors.New("reading response to SSLRequest"),
codeBackendDown)
return errors.New("reading response to SSLRequest")
}
if response[0] != pgAcceptSSLRequest {
return withCode(
errors.New("target server refused TLS connection"),
codeBackendRefusedTLS)
return errors.New("target server refused TLS connection")
}
conn = tls.Client(conn, tlsConfig.Clone())
}

// Forward startup message to the backend connection.
if _, err := conn.Write(msg.Encode(nil)); err != nil {
return withCode(
errors.Wrapf(err, "relaying StartupMessage to target server %v", serverAddress),
codeBackendDown)
return errors.Wrapf(err, "relaying StartupMessage to target server %v", serverAddress)
}

return nil
}()
if ctx.Err() != nil {
// If the context is cancelled, overwrite the error because closing the
// connection caused the connection to fail at an arbitrary step.
err = withCode(
errors.Wrapf(ctx.Err(), "unable to negotiate connection with %s", serverAddress),
codeBackendDown,
)
err = errors.Wrapf(ctx.Err(), "unable to negotiate connection with %s", serverAddress)
}
if err != nil {
_ = conn.Close()
return nil, err
return nil, withCode(err, codeBackendDialFailed)
}

return conn, nil
Expand Down
8 changes: 4 additions & 4 deletions pkg/ccl/sqlproxyccl/backend_dialer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,22 +120,22 @@ func TestBackendDialTLS(t *testing.T) {
name: "tenant10To11",
addr: sql11.SQLAddr(),
tenantID: 10,
errCode: codeBackendDown,
errCode: codeBackendDialFailed,
}, {
name: "tenant11To10",
addr: sql10.SQLAddr(),
tenantID: 11,
errCode: codeBackendDown,
errCode: codeBackendDialFailed,
}, {
name: "tenant10ToStorage",
addr: storageServer.SystemLayer().AdvSQLAddr(),
tenantID: 10,
errCode: codeBackendDown,
errCode: codeBackendDialFailed,
}, {
name: "tenantWithNodeIDToStoage",
addr: storageServer.SystemLayer().AdvSQLAddr(),
tenantID: uint64(storageServer.NodeID()),
errCode: codeBackendDown,
errCode: codeBackendDialFailed,
}}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/sqlproxyccl/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ func (c *connector) dialSQLServer(
return err
})
if err != nil {
if getErrorCode(err) == codeBackendDown {
if getErrorCode(err) == codeBackendDialFailed {
return nil, markAsRetriableConnectorError(err)
}
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/sqlproxyccl/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -888,14 +888,14 @@ func TestConnector_dialSQLServer(t *testing.T) {
require.Equal(t, c.StartupMsg, msg)
require.Equal(t, "127.0.0.2:4567", serverAddress)
require.Nil(t, tlsConfig)
return nil, withCode(errors.New("bar"), codeBackendDown)
return nil, withCode(errors.New("bar"), codeBackendDialFailed)
},
)()
sa := balancer.NewServerAssignment(tenantID, tracker, nil, "127.0.0.2:4567")
defer sa.Close()

conn, err := c.dialSQLServer(ctx, sa)
require.EqualError(t, err, "codeBackendDown: bar")
require.EqualError(t, err, "codeBackendDialFailed: bar")
require.True(t, isRetriableConnectorError(err))
require.Nil(t, conn)
})
Expand Down
10 changes: 3 additions & 7 deletions pkg/ccl/sqlproxyccl/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,9 @@ const (
// on the client's session parameters.
codeParamsRoutingFailed

// codeBackendDown indicates an error establishing or maintaining a connection
// to the backend SQL server.
codeBackendDown

// codeBackendRefusedTLS indicates that the backend SQL server refused a TLS-
// enabled SQL connection.
codeBackendRefusedTLS
// codeBackendDialFailed indicates an error establishing a connection
// between the proxy and the backend SQL server.
codeBackendDialFailed

// codeBackendDisconnected indicates that the backend disconnected (with a
// connection error) while serving client traffic.
Expand Down
19 changes: 8 additions & 11 deletions pkg/ccl/sqlproxyccl/errorcode_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 5 additions & 3 deletions pkg/ccl/sqlproxyccl/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,11 +292,13 @@ func (metrics *metrics) updateForError(err error) {
metrics.ClientDisconnectCount.Inc(1)
case codeProxyRefusedConnection:
metrics.RefusedConnCount.Inc(1)
metrics.BackendDownCount.Inc(1)
case codeParamsRoutingFailed, codeUnavailable:
metrics.RoutingErrCount.Inc(1)
metrics.BackendDownCount.Inc(1)
case codeBackendDown:
case codeBackendDialFailed:
// NOTE: Historically, we had the code named codeBackendDown instead of
// codeBackendDialFailed. This has been renamed to codeBackendDialFailed
// for accuracy, and to prevent confusion by developers. We don't rename
// the metrics here as that may break downstream consumers.
metrics.BackendDownCount.Inc(1)
case codeAuthFailed:
metrics.AuthFailedCount.Inc(1)
Expand Down
8 changes: 4 additions & 4 deletions pkg/ccl/sqlproxyccl/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@ func TestMetricsUpdateForError(t *testing.T) {

{codeExpiredClientConnection, []*metric.Counter{m.ExpiredClientConnCount}},

{codeProxyRefusedConnection, []*metric.Counter{m.RefusedConnCount, m.BackendDownCount}},
{codeProxyRefusedConnection, []*metric.Counter{m.RefusedConnCount}},

{codeParamsRoutingFailed, []*metric.Counter{m.RoutingErrCount, m.BackendDownCount}},
{codeUnavailable, []*metric.Counter{m.RoutingErrCount, m.BackendDownCount}},
{codeParamsRoutingFailed, []*metric.Counter{m.RoutingErrCount}},
{codeUnavailable, []*metric.Counter{m.RoutingErrCount}},

{codeBackendDown, []*metric.Counter{m.BackendDownCount}},
{codeBackendDialFailed, []*metric.Counter{m.BackendDownCount}},

{codeAuthFailed, []*metric.Counter{m.AuthFailedCount}},
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/sqlproxyccl/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func toPgError(err error) *pgproto3.ErrorResponse {
switch getErrorCode(err) {
// These are send as is.
case codeExpiredClientConnection,
codeBackendDown,
codeBackendDialFailed,
codeParamsRoutingFailed,
codeClientDisconnected,
codeBackendDisconnected,
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/sqlproxyccl/proxy_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,7 @@ func TestBackendDownRetry(t *testing.T) {
if callCount >= 3 {
directoryServer.DeleteTenant(roachpb.MustMakeTenantID(28))
}
return nil, withCode(errors.New("SQL pod is down"), codeBackendDown)
return nil, withCode(errors.New("SQL pod is down"), codeBackendDialFailed)
})()

// Valid connection, but no backend server running.
Expand Down Expand Up @@ -1307,7 +1307,7 @@ func TestDirectoryConnect(t *testing.T) {
if countFailures >= 3 {
return nil, withCode(errors.New("backend disconnected"), codeBackendDisconnected)
}
return nil, withCode(errors.New("backend down"), codeBackendDown)
return nil, withCode(errors.New("backend down"), codeBackendDialFailed)
})()

// Ensure that Directory.ReportFailure is being called correctly.
Expand Down
64 changes: 64 additions & 0 deletions pkg/ccl/streamingccl/streamingest/stream_ingest_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/repstream"
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
Expand All @@ -25,6 +26,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
"github.com/cockroachdb/cockroach/pkg/sql/syntheticprivilege"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
)

type streamIngestManagerImpl struct {
Expand Down Expand Up @@ -85,6 +88,67 @@ func newStreamIngestManagerWithPrivilegesCheck(
}, nil
}

// applyCutoverTime modifies the consumer job record with a cutover time and
// unpauses the job if necessary.
func applyCutoverTime(
ctx context.Context,
jobRegistry *jobs.Registry,
txn isql.Txn,
ingestionJobID jobspb.JobID,
cutoverTimestamp hlc.Timestamp,
) error {
log.Infof(ctx, "adding cutover time %s to job record", cutoverTimestamp)
if err := jobRegistry.UpdateJobWithTxn(ctx, ingestionJobID, txn, false,
func(txn isql.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error {
progress := md.Progress.GetStreamIngest()
details := md.Payload.GetStreamIngestion()
if progress.ReplicationStatus == jobspb.ReplicationCuttingOver {
return errors.Newf("job %d already started cutting over to timestamp %s",
ingestionJobID, progress.CutoverTime)
}

progress.ReplicationStatus = jobspb.ReplicationPendingCutover
// Update the sentinel being polled by the stream ingestion job to
// check if a complete has been signaled.
progress.CutoverTime = cutoverTimestamp
progress.RemainingCutoverSpans = roachpb.Spans{details.Span}
ju.UpdateProgress(md.Progress)
return nil
}); err != nil {
return err
}
// Unpause the job if it is paused.
return jobRegistry.Unpause(ctx, txn, ingestionJobID)
}

func getReplicationStatsAndStatus(
ctx context.Context, jobRegistry *jobs.Registry, txn isql.Txn, ingestionJobID jobspb.JobID,
) (*streampb.StreamIngestionStats, string, error) {
job, err := jobRegistry.LoadJobWithTxn(ctx, ingestionJobID, txn)
if err != nil {
return nil, jobspb.ReplicationError.String(), err
}
details, ok := job.Details().(jobspb.StreamIngestionDetails)
if !ok {
return nil, jobspb.ReplicationError.String(),
errors.Newf("job with id %d is not a stream ingestion job", job.ID())
}

details.StreamAddress, err = redactSourceURI(details.StreamAddress)
if err != nil {
return nil, jobspb.ReplicationError.String(), err
}

stats, err := replicationutils.GetStreamIngestionStatsNoHeartbeat(ctx, details, job.Progress())
if err != nil {
return nil, jobspb.ReplicationError.String(), err
}
if job.Status() == jobs.StatusPaused {
return stats, jobspb.ReplicationPaused.String(), nil
}
return stats, stats.IngestionProgress.ReplicationStatus.String(), nil
}

func init() {
repstream.GetStreamIngestManagerHook = newStreamIngestManagerWithPrivilegesCheck
}
Loading

0 comments on commit 07266c3

Please sign in to comment.