Skip to content

Commit

Permalink
streamingccl: add a builtin function that retrives stream ingestion s…
Browse files Browse the repository at this point in the history
…tatitics

Support crdb_internal.stream_ingestion_stats(ingestion_job_id) to
running progress for a stream ingestion job.

Release note (sql change): Support crdb_internal.stream_ingestion_stats
(ingestion_job_id) to running progress for a stream ingestion job.

Release Justification: Cat 4.
  • Loading branch information
gh-casper committed Apr 11, 2022
1 parent 29de55a commit 50aa866
Show file tree
Hide file tree
Showing 15 changed files with 226 additions and 64 deletions.
2 changes: 2 additions & 0 deletions docs/generated/sql/functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -2575,6 +2575,8 @@ The swap_ordinate_string parameter is a 2-character string naming the ordinates
</span></td></tr>
<tr><td><a name="crdb_internal.start_replication_stream"></a><code>crdb_internal.start_replication_stream(tenant_id: <a href="int.html">int</a>) &rarr; <a href="int.html">int</a></code></td><td><span class="funcdesc"><p>This function can be used on the producer side to start a replication stream for the specified tenant. The returned stream ID uniquely identifies created stream. The caller must periodically invoke crdb_internal.heartbeat_stream() function to notify that the replication is still ongoing.</p>
</span></td></tr>
<tr><td><a name="crdb_internal.stream_ingestion_stats"></a><code>crdb_internal.stream_ingestion_stats(job_id: <a href="int.html">int</a>) &rarr; jsonb</code></td><td><span class="funcdesc"><p>This function can be used on the ingestion side to get a statistics summary in json format of a stream ingestion job</p>
</span></td></tr>
<tr><td><a name="crdb_internal.stream_partition"></a><code>crdb_internal.stream_partition(stream_id: <a href="int.html">int</a>, partition_spec: <a href="bytes.html">bytes</a>) &rarr; tuple{bytes AS stream_event}</code></td><td><span class="funcdesc"><p>Stream partition data</p>
</span></td></tr></tbody>
</table>
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/streamingccl/streamclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ package streamclient

import (
"context"

"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streampb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/streaming"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -55,7 +55,7 @@ type Client interface {
// that source cluster protected timestamp _may_ be advanced up to the passed ts
// (which may be zero if no progress has been made e.g. during backfill).
// TODO(dt): ts -> checkpointToken.
Heartbeat(ctx context.Context, streamID streaming.StreamID, consumed hlc.Timestamp) error
Heartbeat(ctx context.Context, streamID streaming.StreamID, consumed hlc.Timestamp) (*streampb.StreamReplicationStatus, error)

// Plan returns a Topology for this stream.
// TODO(dt): separate target argument from address argument.
Expand Down
7 changes: 4 additions & 3 deletions pkg/ccl/streamingccl/streamclient/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streampb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/streaming"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
Expand Down Expand Up @@ -43,8 +44,8 @@ func (sc testStreamClient) Plan(ctx context.Context, ID streaming.StreamID) (Top
// Heartbeat implements the Client interface.
func (sc testStreamClient) Heartbeat(
ctx context.Context, ID streaming.StreamID, _ hlc.Timestamp,
) error {
return nil
) (*streampb.StreamReplicationStatus, error) {
return nil, nil
}

// Close implements the Client interface.
Expand Down Expand Up @@ -130,7 +131,7 @@ func ExampleClient() {
ts := ingested.ts
ingested.Unlock()

if err := client.Heartbeat(ctx, id, ts); err != nil {
if _, err := client.Heartbeat(ctx, id, ts); err != nil {
return err
}
}
Expand Down
16 changes: 6 additions & 10 deletions pkg/ccl/streamingccl/streamclient/partitioned_stream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,31 +75,27 @@ func (p *partitionedStreamClient) Create(
// Heartbeat implements Client interface.
func (p *partitionedStreamClient) Heartbeat(
ctx context.Context, streamID streaming.StreamID, consumed hlc.Timestamp,
) error {
) (*streampb.StreamReplicationStatus, error) {
conn, err := p.srcDB.Conn(ctx)
if err != nil {
return err
return nil, err
}

row := conn.QueryRowContext(ctx,
`SELECT crdb_internal.replication_stream_progress($1, $2)`, streamID, consumed.String())
if row.Err() != nil {
return errors.Wrapf(row.Err(), "Error in sending heartbeats to replication stream %d", streamID)
return nil, errors.Wrapf(row.Err(), "Error in sending heartbeats to replication stream %d", streamID)
}

var rawStatus []byte
if err := row.Scan(&rawStatus); err != nil {
return err
return nil, err
}
var status streampb.StreamReplicationStatus
if err := protoutil.Unmarshal(rawStatus, &status); err != nil {
return err
}
// TODO(casper): add observability for stream protected timestamp
if status.StreamStatus != streampb.StreamReplicationStatus_STREAM_ACTIVE {
return streamingccl.NewStreamStatusErr(streamID, status.StreamStatus)
return nil, err
}
return nil
return &status, nil
}

// postgresURL converts an SQL serving address into a postgres URL.
Expand Down
5 changes: 3 additions & 2 deletions pkg/ccl/streamingccl/streamclient/random_stream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package streamclient
import (
"context"
"fmt"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streampb"
"math/rand"
"net/url"
"strconv"
Expand Down Expand Up @@ -259,8 +260,8 @@ func (m *randomStreamClient) Create(
// Heartbeat implements the Client interface.
func (m *randomStreamClient) Heartbeat(
ctx context.Context, ID streaming.StreamID, _ hlc.Timestamp,
) error {
return nil
) (*streampb.StreamReplicationStatus, error) {
return nil, nil
}

// getDescriptorAndNamespaceKVForTableID returns the namespace and descriptor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ type streamIngestionFrontier struct {
heartbeatSender *heartbeatSender

lastPartitionUpdate time.Time
partitionProgress map[string]jobspb.StreamIngestionProgress_PartitionProgress
partitionProgress map[string]*jobspb.StreamIngestionProgress_PartitionProgress
}

var _ execinfra.Processor = &streamIngestionFrontier{}
Expand Down Expand Up @@ -101,7 +101,7 @@ func newStreamIngestionFrontierProcessor(
input: input,
highWaterAtStart: spec.HighWaterAtStart,
frontier: frontier,
partitionProgress: make(map[string]jobspb.StreamIngestionProgress_PartitionProgress),
partitionProgress: make(map[string]*jobspb.StreamIngestionProgress_PartitionProgress),
heartbeatSender: heartbeatSender,
}
if err := sf.Init(
Expand Down Expand Up @@ -132,6 +132,7 @@ type heartbeatSender struct {
streamID streaming.StreamID
frontierUpdates chan hlc.Timestamp
frontier hlc.Timestamp
streamStatus *streampb.StreamReplicationStatus
flowCtx *execinfra.FlowCtx
// cg runs the heartbeatSender thread.
cg ctxgroup.Group
Expand All @@ -158,10 +159,10 @@ func newHeartbeatSender(
}, nil
}

func (h *heartbeatSender) maybeHeartbeat(ctx context.Context, frontier hlc.Timestamp) error {
func (h *heartbeatSender) maybeHeartbeat(ctx context.Context, frontier hlc.Timestamp) (*streampb.StreamReplicationStatus, error) {
heartbeatFrequency := streamingccl.StreamReplicationConsumerHeartbeatFrequency.Get(&h.flowCtx.EvalCtx.Settings.SV)
if h.lastSent.Add(heartbeatFrequency).After(timeutil.Now()) {
return nil
return nil, nil
}
h.lastSent = timeutil.Now()
return h.client.Heartbeat(ctx, h.streamID, frontier)
Expand All @@ -177,7 +178,7 @@ func (h *heartbeatSender) startHeartbeatLoop(ctx context.Context) {
timer := time.NewTimer(streamingccl.StreamReplicationConsumerHeartbeatFrequency.
Get(&h.flowCtx.EvalCtx.Settings.SV))
defer timer.Stop()
unknownStatusErr := log.Every(1 * time.Minute)
unknownStreamStatusRetryErr := log.Every(1 * time.Minute)
for {
select {
case <-ctx.Done():
Expand All @@ -190,24 +191,26 @@ func (h *heartbeatSender) startHeartbeatLoop(ctx context.Context) {
case frontier := <-h.frontierUpdates:
h.frontier.Forward(frontier)
}
err := h.maybeHeartbeat(ctx, h.frontier)
if err == nil {
continue
var err error
h.streamStatus, err = h.maybeHeartbeat(ctx, h.frontier)

// TODO(casper): add unit tests to test different kinds of client errors.
if err != nil {
return err
}

var se streamingccl.StreamStatusErr
if !errors.As(err, &se) {
return errors.Wrap(err, "unknown stream status error")
if h.streamStatus == nil || h.streamStatus.StreamStatus == streampb.StreamReplicationStatus_STREAM_ACTIVE{
continue
}

if se.StreamStatus == streampb.StreamReplicationStatus_UNKNOWN_STREAM_STATUS_RETRY {
if unknownStatusErr.ShouldLog() {
log.Warningf(ctx, "replication stream %d has unknown status error", se.StreamID)
if h.streamStatus.StreamStatus == streampb.StreamReplicationStatus_UNKNOWN_STREAM_STATUS_RETRY {
if unknownStreamStatusRetryErr.ShouldLog() {
log.Warningf(ctx, "replication stream %d has unknown stream status error and will retry later", h.streamID)
}
continue
}
// The replication stream is either paused or inactive.
return err
return streamingccl.NewStreamStatusErr(h.streamID, h.streamStatus.StreamStatus)
}
}
err := errors.CombineErrors(sendHeartbeats(), h.client.Close())
Expand Down Expand Up @@ -253,18 +256,18 @@ func (sf *streamIngestionFrontier) Next() (
break
}

if err := sf.maybeUpdatePartitionProgress(); err != nil {
// Updating the partition progress isn't a fatal error.
log.Errorf(sf.Ctx, "failed to update partition progress: %+v", err)
}

var frontierChanged bool
var err error
if frontierChanged, err = sf.noteResolvedTimestamps(row[0]); err != nil {
sf.MoveToDraining(err)
break
}

if err := sf.maybeUpdatePartitionProgress(); err != nil {
// Updating the partition progress isn't a fatal error.
log.Errorf(sf.Ctx, "failed to update partition progress: %+v", err)
}

// Send back a row to the job so that it can update the progress.
newResolvedTS := sf.frontier.Frontier()
select {
Expand Down Expand Up @@ -371,7 +374,7 @@ func (sf *streamIngestionFrontier) maybeUpdatePartitionProgress() error {
partitionKey := span.Key
partition := string(partitionKey)
if curFrontier, ok := partitionFrontiers[partition]; !ok {
partitionFrontiers[partition] = jobspb.StreamIngestionProgress_PartitionProgress{
partitionFrontiers[partition] = &jobspb.StreamIngestionProgress_PartitionProgress{
IngestedTimestamp: timestamp,
}
} else if curFrontier.IngestedTimestamp.Less(timestamp) {
Expand All @@ -385,9 +388,11 @@ func (sf *streamIngestionFrontier) maybeUpdatePartitionProgress() error {
return job.FractionProgressed(ctx, nil, /* txn */
func(ctx context.Context, details jobspb.ProgressDetails) float32 {
prog := details.(*jobspb.Progress_StreamIngest).StreamIngest
prog.PartitionProgress = partitionFrontiers
// "FractionProgressed" isn't relevant on jobs that are streaming in
// changes.
prog.PartitionProgress = make(map[string]jobspb.StreamIngestionProgress_PartitionProgress)
for partition, progress := range partitionFrontiers {
prog.PartitionProgress[partition] = *progress
}
// "FractionProgressed" isn't relevant on jobs that are streaming in changes.
return 0.0
},
)
Expand Down
10 changes: 4 additions & 6 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/streaming"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/errors"
)
Expand All @@ -33,6 +34,7 @@ func ingest(
streamAddress streamingccl.StreamAddress,
oldTenantID roachpb.TenantID,
newTenantID roachpb.TenantID,
streamID streaming.StreamID,
startTime hlc.Timestamp,
progress jobspb.Progress,
ingestionJobID jobspb.JobID,
Expand All @@ -44,11 +46,6 @@ func ingest(
}
ingestWithClient := func() error {
// TODO(dt): if there is an existing stream ID, reconnect to it.
streamID, err := client.Create(ctx, oldTenantID)
if err != nil {
return err
}

topology, err := client.Plan(ctx, streamID)
if err != nil {
return err
Expand Down Expand Up @@ -105,7 +102,8 @@ func (s *streamIngestionResumer) Resume(resumeCtx context.Context, execCtx inter

// Start ingesting KVs from the replication stream.
streamAddress := streamingccl.StreamAddress(details.StreamAddress)
return ingest(resumeCtx, p, streamAddress, details.TenantID, details.NewTenantID, details.StartTime, s.job.Progress(), s.job.ID())
return ingest(resumeCtx, p, streamAddress, details.TenantID, details.NewTenantID,
streaming.StreamID(details.StreamID), details.StartTime, s.job.Progress(), s.job.ID())
}

// revertToCutoverTimestamp reads the job progress for the cutover time and
Expand Down
52 changes: 49 additions & 3 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"context"
"fmt"
"net/url"
"strconv"
"strings"
"testing"
"time"
Expand All @@ -20,6 +21,7 @@ import (
_ "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamingtest"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streampb"
_ "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamproducer"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
Expand All @@ -34,13 +36,46 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
json2 "github.com/cockroachdb/cockroach/pkg/util/json"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

func verifyIngestionStats(t *testing.T, streamID int64, cutoverTime time.Time, stats string) {
fetchValueKey := func(j json2.JSON, key string) json2.JSON {
val, err := j.FetchValKey(key)
require.NoError(t, err)
return val
}

parseInt64 := func(s string) int64 {
res, err := strconv.Atoi(s)
require.NoError(t, err)
return int64(res)
}

statsJson, err := json2.ParseJSON(stats)
require.NoError(t, err)

ingestionProgress := fetchValueKey(statsJson, "ingestion_progress")
require.Equal(t, cutoverTime.UnixNano(),
parseInt64(fetchValueKey(fetchValueKey(ingestionProgress, "cutover_time"), "wall_time").String()))

partitionProgressIter, err := fetchValueKey(ingestionProgress, "partition_progress").ObjectIter()
require.NoError(t, err)
for partitionProgressIter.Next() {
require.Less(t, cutoverTime.UnixNano(), parseInt64(fetchValueKey(fetchValueKey(
partitionProgressIter.Value(), "ingested_timestamp"), "wall_time").String()))
}

require.Equal(t, streamID, parseInt64(fetchValueKey(statsJson, "stream_id").String()))
require.Equal(t, strconv.Itoa(int(streampb.StreamReplicationStatus_STREAM_INACTIVE)),
fetchValueKey(fetchValueKey(statsJson, "stream_replication_status"), "stream_status").String())
}

// TestTenantStreaming tests that tenants can stream changes end-to-end.
func TestTenantStreaming(t *testing.T) {
defer leaktest.AfterTest(t)()
Expand All @@ -61,7 +96,9 @@ func TestTenantStreaming(t *testing.T) {
// Start tenant server in the source cluster.
tenantID := serverutils.TestTenantID()
_, tenantConn := serverutils.StartTenant(t, source, base.TestTenantArgs{TenantID: tenantID})
defer tenantConn.Close()
defer func() {
require.NoError(t, tenantConn.Close())
}()
// sourceSQL refers to the tenant generating the data.
sourceSQL := sqlutils.MakeSQLRunner(tenantConn)

Expand All @@ -86,8 +123,9 @@ SET CLUSTER SETTING stream_replication.min_checkpoint_frequency = '1s';
destSQL := hDest.SysDB
destSQL.ExecMultiple(t, strings.Split(`
SET CLUSTER SETTING stream_replication.consumer_heartbeat_frequency = '2s';
SET CLUSTER SETTING bulkio.stream_ingestion.minimum_flush_interval = '5us';
SET CLUSTER SETTING bulkio.stream_ingestion.minimum_flush_interval = '500ms';
SET CLUSTER SETTING bulkio.stream_ingestion.cutover_signal_poll_interval = '100ms';
SET CLUSTER SETTING streaming.partition_progress_frequency = '100ms';
SET enable_experimental_stream_replication = true;
`,
";")...)
Expand All @@ -96,7 +134,7 @@ SET enable_experimental_stream_replication = true;
pgURL, cleanupSink := sqlutils.PGUrl(t, source.ServingSQLAddr(), t.Name(), url.User(security.RootUser))
defer cleanupSink()

var ingestionJobID int
var ingestionJobID, streamProducerJobID int64
var startTime string
sourceSQL.QueryRow(t, "SELECT cluster_logical_timestamp()").Scan(&startTime)

Expand All @@ -108,6 +146,11 @@ SET enable_experimental_stream_replication = true;
pgURL.String(),
).Scan(&ingestionJobID)

sourceDBRunner.CheckQueryResultsRetry(t,
"SELECT count(*) FROM [SHOW JOBS] WHERE job_type = 'STREAM REPLICATION'", [][]string{{"1"}})
sourceDBRunner.QueryRow(t, "SELECT job_id FROM [SHOW JOBS] WHERE job_type = 'STREAM REPLICATION'").
Scan(&streamProducerJobID)

sourceSQL.Exec(t, `
CREATE DATABASE d;
CREATE TABLE d.t1(i int primary key, a string, b string);
Expand Down Expand Up @@ -138,6 +181,9 @@ INSERT INTO d.t2 VALUES (2);
ingestionJobID, cutoverTime)

jobutils.WaitForJobToSucceed(t, destSQL, jobspb.JobID(ingestionJobID))
jobutils.WaitForJobToSucceed(t, sourceDBRunner, jobspb.JobID(streamProducerJobID))

verifyIngestionStats(t, streamProducerJobID, cutoverTime, destSQL.QueryStr(t, "SELECT crdb_internal.stream_ingestion_stats($1)", ingestionJobID)[0][0])

query := "SELECT * FROM d.t1"
sourceData := sourceSQL.QueryStr(t, query)
Expand Down
Loading

0 comments on commit 50aa866

Please sign in to comment.