Skip to content

Commit

Permalink
streamingccl: avoid double logging
Browse files Browse the repository at this point in the history
In updateRunningStatus we log the running status. In most cases we
were also logging it _before_ calling that function. Here, we stop
doing that.

Additionally, I cleaned up a few log messages that I think were
unnecessarily verbose.

Epic: none

Release note: None
  • Loading branch information
stevendanna committed Aug 17, 2023
1 parent 8359621 commit 97b47e5
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 38 deletions.
20 changes: 5 additions & 15 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ package streamingest

import (
"context"
"fmt"
"math"
"sort"
"time"
Expand All @@ -36,6 +35,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
"github.com/cockroachdb/redact"
)

var replanThreshold = settings.RegisterFloatSetting(
Expand Down Expand Up @@ -78,19 +78,14 @@ func startDistIngestion(
heartbeatTimestamp = initialScanTimestamp
}

msg := fmt.Sprintf("resuming stream (producer job %d) from %s",
streamID, heartbeatTimestamp)
msg := redact.Sprintf("resuming stream (producer job %d) from %s", streamID, heartbeatTimestamp)
updateRunningStatus(ctx, ingestionJob, jobspb.InitializingReplication, msg)

client, err := connectToActiveClient(ctx, ingestionJob, execCtx.ExecCfg().InternalDB)
if err != nil {
return err
}
defer func() {
if err := client.Close(ctx); err != nil {
log.Warningf(ctx, "stream ingestion client did not shut down properly: %s", err.Error())
}
}()
defer closeAndLog(ctx, client)
if err := waitUntilProducerActive(ctx, client, streamID, heartbeatTimestamp, ingestionJob.ID()); err != nil {
return err
}
Expand Down Expand Up @@ -155,8 +150,6 @@ func startDistIngestion(
)

execInitialPlan := func(ctx context.Context) error {
log.Infof(ctx, "starting to run DistSQL flow for stream ingestion job %d",
ingestionJob.ID())
defer stopReplanner()
ctx = logtags.AddTag(ctx, "stream-ingest-distsql", nil)

Expand All @@ -180,8 +173,7 @@ func startDistIngestion(
return rw.Err()
}

updateRunningStatus(ctx, ingestionJob, jobspb.Replicating,
"running the SQL flow for the stream ingestion job")
updateRunningStatus(ctx, ingestionJob, jobspb.Replicating, "physical replication running")
err = ctxgroup.GoAndWait(ctx, execInitialPlan, replanner)
if errors.Is(err, sql.ErrPlanChanged) {
execCtx.ExecCfg().JobRegistry.MetricsStruct().StreamIngest.(*Metrics).ReplanCount.Inc(1)
Expand Down Expand Up @@ -209,9 +201,7 @@ func (p *replicationFlowPlanner) makePlan(
gatewayID base.SQLInstanceID,
) func(context.Context, *sql.DistSQLPlanner) (*sql.PhysicalPlan, *sql.PlanningCtx, error) {
return func(ctx context.Context, dsp *sql.DistSQLPlanner) (*sql.PhysicalPlan, *sql.PlanningCtx, error) {
log.Infof(ctx, "Generating DistSQL plan candidate for stream ingestion job %d",
ingestionJobID)

log.Infof(ctx, "generating DistSQL plan candidate")
streamID := streampb.StreamID(details.StreamID)
topology, err := client.Plan(ctx, streamID)
if err != nil {
Expand Down
44 changes: 21 additions & 23 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
)

type streamIngestionResumer struct {
Expand All @@ -49,7 +50,7 @@ func connectToActiveClient(
streamAddresses := progress.GetStreamIngest().StreamAddresses

if len(streamAddresses) > 0 {
log.Infof(ctx, "ingestion job %d attempting to connect to existing stream addresses", ingestionJob.ID())
log.Infof(ctx, "attempting to connect to existing stream addresses")
client, err := streamclient.GetFirstActiveClient(ctx, streamAddresses)
if err == nil {
return client, err
Expand All @@ -71,14 +72,16 @@ func updateRunningStatus(
ctx context.Context,
ingestionJob *jobs.Job,
status jobspb.ReplicationStatus,
runningStatus string,
runningStatus redact.RedactableString,
) {
err := ingestionJob.NoTxn().Update(ctx, func(txn isql.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error {
updateRunningStatusInternal(md, ju, status, runningStatus)
updateRunningStatusInternal(md, ju, status, string(runningStatus.Redact()))
return nil
})
if err != nil {
log.Warningf(ctx, "error when updating job running status: %s", err)
} else if status == jobspb.ReplicationError {
log.Warningf(ctx, "%s", runningStatus)
} else {
log.Infof(ctx, "%s", runningStatus)
}
Expand All @@ -101,10 +104,9 @@ func completeIngestion(
return err
}

streamID := details.StreamID
log.Infof(ctx, "completing the producer job %d", streamID)
updateRunningStatus(ctx, ingestionJob, jobspb.ReplicationCuttingOver,
"completing the producer job in the source cluster")
msg := redact.Sprintf("completing the producer job %d in the source cluster",
details.StreamID)
updateRunningStatus(ctx, ingestionJob, jobspb.ReplicationCuttingOver, msg)
completeProducerJob(ctx, ingestionJob, execCtx.ExecCfg().InternalDB, true)

// Now that we have completed the cutover we can release the protected
Expand Down Expand Up @@ -138,12 +140,7 @@ func completeProducerJob(
if err != nil {
return err
}
defer func() {
if err := client.Close(ctx); err != nil {
log.Warningf(ctx, "error encountered when closing stream client: %s",
err.Error())
}
}()
defer closeAndLog(ctx, client)
return client.Complete(ctx, streampb.StreamID(streamID), successfulIngestion)
},
); err != nil {
Expand Down Expand Up @@ -209,10 +206,8 @@ func ingestWithRetries(
if jobs.IsPermanentJobError(err) || errors.Is(err, context.Canceled) {
break
}
const msgFmt = "waiting before retrying error: %s"
log.Warningf(ctx, msgFmt, err)
updateRunningStatus(ctx, ingestionJob, jobspb.ReplicationError,
fmt.Sprintf(msgFmt, err))
status := redact.Sprintf("waiting before retrying error: %s", err)
updateRunningStatus(ctx, ingestionJob, jobspb.ReplicationError, status)
newReplicatedTime := loadReplicatedTime(ctx, execCtx.ExecCfg().InternalDB, ingestionJob)
if lastReplicatedTime.Less(newReplicatedTime) {
r.Reset()
Expand Down Expand Up @@ -247,9 +242,8 @@ func loadReplicatedTime(ctx context.Context, db isql.DB, ingestionJob *jobs.Job)
func (s *streamIngestionResumer) handleResumeError(
ctx context.Context, execCtx sql.JobExecContext, err error,
) error {
const errorFmt = "ingestion job failed (%s) but is being paused"
log.Warningf(ctx, errorFmt, err)
updateRunningStatus(ctx, s.job, jobspb.ReplicationError, fmt.Sprintf(errorFmt, err))
msg := redact.Sprintf("ingestion job failed (%s) but is being paused", err)
updateRunningStatus(ctx, s.job, jobspb.ReplicationError, msg)
// The ingestion job is paused but the producer job will keep
// running until it times out. Users can still resume ingestion before
// the producer job times out.
Expand Down Expand Up @@ -429,9 +423,7 @@ func maybeRevertToCutoverTimestamp(
if !shouldRevertToCutover {
return false, nil
}
log.Infof(ctx,
"reverting to cutover timestamp %s for stream ingestion job %d",
cutoverTimestamp, ingestionJob.ID())
log.Infof(ctx, "reverting to cutover timestamp %s", cutoverTimestamp)
if p.ExecCfg().StreamingTestingKnobs != nil && p.ExecCfg().StreamingTestingKnobs.AfterCutoverStarted != nil {
p.ExecCfg().StreamingTestingKnobs.AfterCutoverStarted()
}
Expand Down Expand Up @@ -523,6 +515,12 @@ func (s *streamIngestionResumer) OnFailOrCancel(
})
}

func closeAndLog(ctx context.Context, c streamclient.Client) {
if err := c.Close(ctx); err != nil {
log.Warningf(ctx, "error closing stream client: %s", err.Error())
}
}

// cutoverProgressTracker updates the job progress and the given
// metric with the number of ranges still remainng to revert during
// the cutover process.
Expand Down

0 comments on commit 97b47e5

Please sign in to comment.