Skip to content

Commit

Permalink
streamingccl: minor error style cleanups
Browse files Browse the repository at this point in the history
Capitalized error messages are rare in the code base, so I've made
these more consistent with the rest of the code base.

Release note: None
  • Loading branch information
stevendanna committed Jun 24, 2022
1 parent c09d7fc commit b4ed9cb
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (p *partitionedStreamClient) Create(

row := conn.QueryRowContext(ctx, `SELECT crdb_internal.start_replication_stream($1)`, tenantID.ToUint64())
if row.Err() != nil {
return streamID, errors.Wrapf(row.Err(), "Error in creating replication stream for tenant %s", tenantID.String())
return streamID, errors.Wrapf(row.Err(), "error creating replication stream for tenant %s", tenantID.String())
}

err = row.Scan(&streamID)
Expand All @@ -85,7 +85,7 @@ func (p *partitionedStreamClient) Heartbeat(
`SELECT crdb_internal.replication_stream_progress($1, $2)`, streamID, consumed.String())
if row.Err() != nil {
return streampb.StreamReplicationStatus{},
errors.Wrapf(row.Err(), "Error in sending heartbeats to replication stream %d", streamID)
errors.Wrapf(row.Err(), "error sending heartbeat to replication stream %d", streamID)
}

var rawStatus []byte
Expand Down Expand Up @@ -121,7 +121,7 @@ func (p *partitionedStreamClient) Plan(

row := conn.QueryRowContext(ctx, `SELECT crdb_internal.replication_stream_spec($1)`, streamID)
if row.Err() != nil {
return nil, errors.Wrap(row.Err(), "Error in planning a replication stream")
return nil, errors.Wrapf(row.Err(), "error planning replication stream %d", streamID)
}

var rawSpec []byte
Expand Down Expand Up @@ -207,7 +207,7 @@ func (p *partitionedStreamClient) Complete(ctx context.Context, streamID streami
}
row := conn.QueryRowContext(ctx, `SELECT crdb_internal.complete_replication_stream($1)`, streamID)
if row.Err() != nil {
return errors.Wrap(row.Err(), "Error in completing a replication stream")
return errors.Wrapf(row.Err(), "error completing replication stream %d", streamID)
}
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/streamingccl/streamproducer/stream_lifetime.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,10 @@ func getReplicationStreamSpec(
// Returns error if the replication stream is not active
j, err := jobExecCtx.ExecCfg().JobRegistry.LoadJob(evalCtx.Ctx(), jobspb.JobID(streamID))
if err != nil {
return nil, errors.Wrapf(err, "Replication stream %d has error", streamID)
return nil, errors.Wrapf(err, "replication stream %d has error", streamID)
}
if j.Status() != jobs.StatusRunning {
return nil, errors.Errorf("Replication stream %d is not running", streamID)
return nil, errors.Errorf("replication stream %d is not running", streamID)
}

// Partition the spans with SQLPlanner
Expand Down

0 comments on commit b4ed9cb

Please sign in to comment.