Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…82521

79748: streamingccl: add a builtin function that retrives stream ingestion statitics r=gh-casper a=gh-casper

Support crdb_internal.stream_ingestion_stats(ingestion_job_id) to
running progress for a stream ingestion job.

Release note (sql change): Support crdb_internal.stream_ingestion_stats
(ingestion_job_id) to running progress for a stream ingestion job.

Release Justification: Cat 4.

Closes: #57430

81598: obsservice: initial commit r=andreimatei a=andreimatei

These are the humble beginnings of an Observability Service for CRDB. In
time, this is meant to take over a lot of the observability
functionality, extracting it from the database itself. For now, all
there is here is an HTTP reverse proxy, able to route HTTP requests to a
CRDB node (or cluster, through a load balancer). At least for the
transitioning period, if not forever, the Obs Service will route HTTP
paths that it doesn't understand to the Cockroach cluster being
monitored.

The HTTP proxy accepts its own certificates and can serve HTTPS
independently of CRDB (which might be running in --insecure mode and not
serving HTTPS). However, if CRDB is serving HTTPS, then the Obs Service
must also be configured with a certificate so it can also serve HTTPS.

The code of the Obs Service is split in between a binary (minimal
shell), and a library. The idea is to keep the door open for other repos
to extend the Obs Service with extra functionality (e.g. the CC managed
service might want to add cloud-specific observability features). For
such purposes, I've considered making a dedicated Go module for the Obs
Service so that source-level imports of the Obs Service would be as
ergonomic as Go wants them to be. I've put a decent amount of work in
playing with this but, ultimately, I've decided against it, at least for
now. The problem is that the Obs Service wants to take minor code
dependencies on random CRDB libraries (syncutil, log, etc.) and the
cockroach Go module is fairly broken (our git tags look like we do
semantic versioning, but we don't actually do it). The go tooling has a
terrible time with the cockroach module. Also, our code is generally not
`go build`-able. If the obs service was a dedicated module, it would
need to take a dependency on the cockroach module, which would negate
the win for people who want to import it (in fact, it'd make matters
worse for importers because the nasty cockroach dependency would be more
hidden). An alternative I've considered was to start creating modules
for all of the dependencies of the Obs Service. But, if CRDB would then
need to depend on these modules, there's all sorts of problems.

Release note: None

82041: storage: clear range keys in `Writer.Clear*Range` methods r=jbowens a=erikgrinaker

This patch clears range keys in the `Writer` methods `ClearRawRange`,
`ClearMVCCRange`, and `ClearMVCCIteratorRange`, as well as in the
`ClearRangeWithHeuristic` helper.

Range keys are not cleared in `ClearMVCCVersions`, since this method is
specifically for clearing MVCC point key versions, and it is not
possible to clear range keys between versions of the same point key.

Touches #70412.

Release note: None

82377: sql: use declarative schema changer for add column with unique r=fqazi a=fqazi

Previously, the declarative schema changer was disabled
when adding a column with unique constraint, since we
didn't have a complete set of elements / logic for adding
secondary indexes. This was inadequate because operations
would not ordered correctly and rollbacks could potentially
break. To address this, this patch enables support and
addresses the missing pieces to setup the secondary indexes
correctly, such as adding suffix columns, ordering operations
correctly, and returning appropriate errors.

Release note: None

82380: util/log: rework the bufferSink r=andreimatei a=andreimatei

This is a rewrite of bufferSink (now called bufferedSink). As opposed to
before, the code is smaller, more efficient, and, most importantly,
simpler. I couldn't understand the previous code well, and I think it
made new additions to it (in particular, adding a shutdown sequence)
hard to analyze. The semantics of this buffer are now better and simpler.

Before, a log message was traveling through a sequence of two buffers
before being eventually delivered to the wrapped sink:
1. The message was first placed into an accumulator buffer.
2. Then, when that buffer grows too old or too large, the accumulator
   buffer is placed into a queue to be flushed later, and the
   accumulator was reset.
3. There was a limit on the size of this queue of batches to
   be flushed. When the queue was full, new messages were dropped.
There were two goroutines managing these two buffers, which is one too
many.

This behavior of queuing multiple, distinct flushes, was peculiar at
best. There's generally no reason to not coalesce all the flushes into
one, thus speeding them up and reducing the risk of message drops. The
bufferSink's memory limit was not explicit, and difficult enough to
infer: it was set indirectly from the combination of limits on the two
buffers, expressed in different units.

Now, the sequence of two buffers (the accumulator and the flush queue)
is gone. There's a single buffer accumulating new messages, plus at most
one in-flight flush which captures its buffer. As soon as a flush is
triggered, the buffer is reset. The buffer is now configured with a
memory limit and, when that limit is reached, the _oldest_ messages in
the buffer are dropped, instead of the newest. This, combined with the
blocking semantics of the forceSync option, means that a forceSync
message is never dropped (unless its size is, by itself, larger than the
buffer limit). The number of goroutines running for the bufferSink drops
from two to one.

Release note: None

82436: server, ui: handle null plan gist in getStatementDetailsPerPlanHash r=ericharmeling a=ericharmeling

This PR adds a new function that handles null plan gists in `getStatementDetailsPerPlanHash`. The PR also adds some logic to hide null plan gists in the SqlBox of the Statement Details page.

Here's a screenshot of the Statement Details page when the plan gist is null:

<img width="1024" alt="Screen Shot 2022-06-03 at 7 18 59 PM" src="https://user-images.githubusercontent.com/27286675/171966978-6a444297-70d4-4ef3-8afa-5068e3f35d9e.png">


Fixes #82095.

We probably want to figure out why there are null values in [`planner.instrumentation.planGist`](https://github.com/cockroachdb/cockroach/blob/master/pkg/sql/executor_statement_metrics.go#L212) in the first place. `getStatementDetailsPerPlanHash` is pulling from the system table populated with the plan gist from `planner.instrumentation.planGist`. CC `@cucaroach`

Release note: None

82471: dev: allow `--rewrite` when testing many targets with `/...` r=rail a=rickystewart

Closes #82053.

Release note: None

82479: keys: resolve subtle non-bug by exporting RaftLogKeyFromPrefix r=tbg,erikgrinaker a=nvanbenschoten

This commit resolves a subtle interaction that came tantalizingly close to a bug in `StateLoader.LoadLastIndex`. The method uses the `StateLoader`'s underlying `keys.RangeIDPrefixBuf` to generate two keys (`RaftLogPrefix` and `RaftLogKey`) that are in use as the same time.

`RangeIDPrefixBuf` avoids heap allocations by sharing a single byte slice across all keys that it generates. This is why the type has the comment:
> The generated keys are only valid until the next call to one of the key generation methods.

As would be expected, given this comment, the second use of the `RangeIDPrefixBuf` overwrote the buffer and invalidated the first key. However, it happened to get lucky and generate a new key with the same prefix as the old key. As a result, the contents of the first key did not change.

To make this aliasing more explicit and avoid this becoming a real bug in the future, we introduce a new `RaftLogKeyFromPrefix` function that callers can use to generate raft log entry keys from a raft log prefix.

We then use this new function to avoid some redundant encoding work elsewhere due to repeated calls to `RaftLogKey`.

82520: cli,sql: fix reported result columns for EXPLAIN ANALYZE r=yuzefovich a=rafiss

Fixes #82502

This fixes an issue where EXPLAIN ANALYZE would report a RowDescription
for both the EXPLAIN and for the statement being explained. If the
statement had a different number of result columns, this would confuse
the CLI.

No release note, since this bug was not released.

Release note: None

82521: roachtest: fix a minor bug with tpch_concurrency roachtest r=yuzefovich a=yuzefovich

`tpch_concurrency` pushes the cluster to its limits, so it is expected
that OOMs occur. Previously, in a recently added code path, we would
fail the test if an error occurred when running a debugging statement,
yet that error is expected and should be returned.

Fixes: #82510.

Release note: None

Co-authored-by: Casper <[email protected]>
Co-authored-by: Andrei Matei <[email protected]>
Co-authored-by: Erik Grinaker <[email protected]>
Co-authored-by: Faizan Qazi <[email protected]>
Co-authored-by: Eric Harmeling <[email protected]>
Co-authored-by: Ricky Stewart <[email protected]>
Co-authored-by: Nathan VanBenschoten <[email protected]>
Co-authored-by: Rafi Shamim <[email protected]>
Co-authored-by: Yahor Yuzefovich <[email protected]>
  • Loading branch information
10 people committed Jun 7, 2022
11 parents 1fd6c45 + b96b07d + 8c93246 + 72b19bc + 5232627 + 32d749b + ca9787d + e7bc3b3 + c5497ba + f548996 + 01cfeb0 commit d9fd5bf
Show file tree
Hide file tree
Showing 81 changed files with 4,186 additions and 870 deletions.
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@
/pkg/util/stop @cockroachdb/server-prs
/pkg/util/tracing @cockroachdb/obs-inf-prs
/pkg/workload/ @cockroachdb/sql-experience-noreview
/pkg/obsservice/ @cockroachdb/obs-inf-prs

# Allow the security team to have insight into changes to
# authn/authz code.
Expand Down
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ work-Fuzz*
*-fuzz.zip
# vendoring by `go mod vendor` may produce this file temporarily
/.vendor.tmp.*
# The Observability Service binary.
/obsservice

# Custom or private env vars (e.g. internal keys, access tokens, etc).
customenv.mk
Expand Down
3 changes: 2 additions & 1 deletion docs/generated/logsinks.md
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,7 @@ set to "NONE" to disable buffering. Example configuration:
buffering:
max-staleness: 20s
flush-trigger-size: 25KB
max-buffer-size: 10MB
sinks:
file-groups:
health:
Expand All @@ -404,6 +405,6 @@ set to "NONE" to disable buffering. Example configuration:
|--|--|
| `max-staleness` | the maximum time a log message will sit in the buffer before a flush is triggered. |
| `flush-trigger-size` | the number of bytes that will trigger the buffer to flush. |
| `max-in-flight` | the maximum number of buffered flushes before messages start being dropped. |
| `max-buffer-size` | the limit on the size of the messages that are buffered. If this limit is exceeded, messages are dropped. The limit is expected to be higher than FlushTriggerSize. A buffer is flushed as soon as FlushTriggerSize is reached, and a new buffer is created once the flushing is started. Only one flushing operation is active at a time. |


2 changes: 2 additions & 0 deletions docs/generated/sql/functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -2619,6 +2619,8 @@ The swap_ordinate_string parameter is a 2-character string naming the ordinates
</span></td></tr>
<tr><td><a name="crdb_internal.start_replication_stream"></a><code>crdb_internal.start_replication_stream(tenant_id: <a href="int.html">int</a>) &rarr; <a href="int.html">int</a></code></td><td><span class="funcdesc"><p>This function can be used on the producer side to start a replication stream for the specified tenant. The returned stream ID uniquely identifies created stream. The caller must periodically invoke crdb_internal.heartbeat_stream() function to notify that the replication is still ongoing.</p>
</span></td></tr>
<tr><td><a name="crdb_internal.stream_ingestion_stats"></a><code>crdb_internal.stream_ingestion_stats(job_id: <a href="int.html">int</a>) &rarr; jsonb</code></td><td><span class="funcdesc"><p>This function can be used on the ingestion side to get a statistics summary of a stream ingestion job in json format.</p>
</span></td></tr>
<tr><td><a name="crdb_internal.stream_partition"></a><code>crdb_internal.stream_partition(stream_id: <a href="int.html">int</a>, partition_spec: <a href="bytes.html">bytes</a>) &rarr; tuple{bytes AS stream_event}</code></td><td><span class="funcdesc"><p>Stream partition data</p>
</span></td></tr></tbody>
</table>
Expand Down
7 changes: 6 additions & 1 deletion pkg/ccl/streamingccl/streamclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"context"

"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streampb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/streaming"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -55,7 +56,11 @@ type Client interface {
// that source cluster protected timestamp _may_ be advanced up to the passed ts
// (which may be zero if no progress has been made e.g. during backfill).
// TODO(dt): ts -> checkpointToken.
Heartbeat(ctx context.Context, streamID streaming.StreamID, consumed hlc.Timestamp) error
Heartbeat(
ctx context.Context,
streamID streaming.StreamID,
consumed hlc.Timestamp,
) (streampb.StreamReplicationStatus, error)

// Plan returns a Topology for this stream.
// TODO(dt): separate target argument from address argument.
Expand Down
7 changes: 4 additions & 3 deletions pkg/ccl/streamingccl/streamclient/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streampb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/streaming"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
Expand Down Expand Up @@ -43,8 +44,8 @@ func (sc testStreamClient) Plan(ctx context.Context, ID streaming.StreamID) (Top
// Heartbeat implements the Client interface.
func (sc testStreamClient) Heartbeat(
ctx context.Context, ID streaming.StreamID, _ hlc.Timestamp,
) error {
return nil
) (streampb.StreamReplicationStatus, error) {
return streampb.StreamReplicationStatus{}, nil
}

// Close implements the Client interface.
Expand Down Expand Up @@ -130,7 +131,7 @@ func ExampleClient() {
ts := ingested.ts
ingested.Unlock()

if err := client.Heartbeat(ctx, id, ts); err != nil {
if _, err := client.Heartbeat(ctx, id, ts); err != nil {
return err
}
}
Expand Down
17 changes: 7 additions & 10 deletions pkg/ccl/streamingccl/streamclient/partitioned_stream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,31 +75,28 @@ func (p *partitionedStreamClient) Create(
// Heartbeat implements Client interface.
func (p *partitionedStreamClient) Heartbeat(
ctx context.Context, streamID streaming.StreamID, consumed hlc.Timestamp,
) error {
) (streampb.StreamReplicationStatus, error) {
conn, err := p.srcDB.Conn(ctx)
if err != nil {
return err
return streampb.StreamReplicationStatus{}, err
}

row := conn.QueryRowContext(ctx,
`SELECT crdb_internal.replication_stream_progress($1, $2)`, streamID, consumed.String())
if row.Err() != nil {
return errors.Wrapf(row.Err(), "Error in sending heartbeats to replication stream %d", streamID)
return streampb.StreamReplicationStatus{},
errors.Wrapf(row.Err(), "Error in sending heartbeats to replication stream %d", streamID)
}

var rawStatus []byte
if err := row.Scan(&rawStatus); err != nil {
return err
return streampb.StreamReplicationStatus{}, err
}
var status streampb.StreamReplicationStatus
if err := protoutil.Unmarshal(rawStatus, &status); err != nil {
return err
return streampb.StreamReplicationStatus{}, err
}
// TODO(casper): add observability for stream protected timestamp
if status.StreamStatus != streampb.StreamReplicationStatus_STREAM_ACTIVE {
return streamingccl.NewStreamStatusErr(streamID, status.StreamStatus)
}
return nil
return status, nil
}

// postgresURL converts an SQL serving address into a postgres URL.
Expand Down
22 changes: 12 additions & 10 deletions pkg/ccl/streamingccl/streamclient/partitioned_stream_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,27 +101,29 @@ INSERT INTO d.t2 VALUES (2);
require.True(t, testutils.IsError(err, fmt.Sprintf("job with ID %d does not exist", 999)), err)

expectStreamState(streamID, jobs.StatusRunning)
require.NoError(t, client.Heartbeat(ctx, streamID, hlc.Timestamp{WallTime: timeutil.Now().UnixNano()}))
status, err := client.Heartbeat(ctx, streamID, hlc.Timestamp{WallTime: timeutil.Now().UnixNano()})
require.NoError(t, err)
require.Equal(t, streampb.StreamReplicationStatus_STREAM_ACTIVE, status.StreamStatus)

// Pause the underlying producer job of the replication stream
h.SysDB.Exec(t, `PAUSE JOB $1`, streamID)
expectStreamState(streamID, jobs.StatusPaused)
err = client.Heartbeat(ctx, streamID, hlc.Timestamp{WallTime: timeutil.Now().UnixNano()})
require.True(t, testutils.IsError(err,
fmt.Sprintf("replication stream %d is not running, status is STREAM_PAUSED", streamID)), err)
status, err = client.Heartbeat(ctx, streamID, hlc.Timestamp{WallTime: timeutil.Now().UnixNano()})
require.NoError(t, err)
require.Equal(t, streampb.StreamReplicationStatus_STREAM_PAUSED, status.StreamStatus)

// Cancel the underlying producer job of the replication stream
h.SysDB.Exec(t, `CANCEL JOB $1`, streamID)
expectStreamState(streamID, jobs.StatusCanceled)

err = client.Heartbeat(ctx, streamID, hlc.Timestamp{WallTime: timeutil.Now().UnixNano()})
require.True(t, testutils.IsError(err,
fmt.Sprintf("replication stream %d is not running, status is STREAM_INACTIVE", streamID)), err)
status, err = client.Heartbeat(ctx, streamID, hlc.Timestamp{WallTime: timeutil.Now().UnixNano()})
require.NoError(t, err)
require.Equal(t, streampb.StreamReplicationStatus_STREAM_INACTIVE, status.StreamStatus)

// Non-existent stream is not active in the source cluster.
err = client.Heartbeat(ctx, 999, hlc.Timestamp{WallTime: timeutil.Now().UnixNano()})
require.True(t, testutils.IsError(err,
fmt.Sprintf("replication stream %d is not running, status is STREAM_INACTIVE", 999)), err)
status, err = client.Heartbeat(ctx, 999, hlc.Timestamp{WallTime: timeutil.Now().UnixNano()})
require.NoError(t, err)
require.Equal(t, streampb.StreamReplicationStatus_STREAM_INACTIVE, status.StreamStatus)

// Testing client.Subscribe()
makePartitionSpec := func(tables ...string) *streampb.StreamPartitionSpec {
Expand Down
8 changes: 5 additions & 3 deletions pkg/ccl/streamingccl/streamclient/random_stream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streampb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security/username"
Expand Down Expand Up @@ -62,6 +63,7 @@ const (
// TenantID specifies the ID of the tenant we are ingesting data into. This
// allows the client to prefix the generated KVs with the appropriate tenant
// prefix.
// TODO(casper): ensure this should be consistent across the usage of APIs
TenantID = "TENANT_ID"
// IngestionDatabaseID is the ID used in the generated table descriptor.
IngestionDatabaseID = 50 /* defaultDB */
Expand Down Expand Up @@ -258,9 +260,9 @@ func (m *randomStreamClient) Create(

// Heartbeat implements the Client interface.
func (m *randomStreamClient) Heartbeat(
ctx context.Context, ID streaming.StreamID, _ hlc.Timestamp,
) error {
return nil
ctx context.Context, _ streaming.StreamID, _ hlc.Timestamp,
) (streampb.StreamReplicationStatus, error) {
return streampb.StreamReplicationStatus{}, nil
}

// getDescriptorAndNamespaceKVForTableID returns the namespace and descriptor
Expand Down
2 changes: 2 additions & 0 deletions pkg/ccl/streamingccl/streamingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ go_test(
"//pkg/ccl/streamingccl",
"//pkg/ccl/streamingccl/streamclient",
"//pkg/ccl/streamingccl/streamingtest",
"//pkg/ccl/streamingccl/streampb",
"//pkg/ccl/streamingccl/streamproducer",
"//pkg/ccl/utilccl",
"//pkg/jobs",
Expand Down Expand Up @@ -110,6 +111,7 @@ go_test(
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util/hlc",
"//pkg/util/json",
"//pkg/util/leaktest",
"//pkg/util/limit",
"//pkg/util/log",
Expand Down
13 changes: 11 additions & 2 deletions pkg/ccl/streamingccl/streamingest/stream_ingest_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
package streamingest

import (
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streampb"
"github.com/cockroachdb/cockroach/pkg/ccl/utilccl"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
Expand All @@ -23,9 +25,16 @@ type streamIngestManagerImpl struct{}

// CompleteStreamIngestion implements streaming.StreamIngestManager interface.
func (r *streamIngestManagerImpl) CompleteStreamIngestion(
evalCtx *eval.Context, txn *kv.Txn, streamID streaming.StreamID, cutoverTimestamp hlc.Timestamp,
evalCtx *eval.Context, txn *kv.Txn, ingestionJobID jobspb.JobID, cutoverTimestamp hlc.Timestamp,
) error {
return completeStreamIngestion(evalCtx, txn, streamID, cutoverTimestamp)
return completeStreamIngestion(evalCtx, txn, ingestionJobID, cutoverTimestamp)
}

// GetStreamIngestionStats implements streaming.StreamIngestManager interface.
func (r *streamIngestManagerImpl) GetStreamIngestionStats(
evalCtx *eval.Context, txn *kv.Txn, ingestionJobID jobspb.JobID,
) (*streampb.StreamIngestionStats, error) {
return getStreamIngestionStats(evalCtx, txn, ingestionJobID)
}

func newStreamIngestManagerWithPrivilegesCheck(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ type streamIngestionFrontier struct {
heartbeatSender *heartbeatSender

lastPartitionUpdate time.Time
partitionProgress map[string]jobspb.StreamIngestionProgress_PartitionProgress
partitionProgress map[string]*jobspb.StreamIngestionProgress_PartitionProgress
}

var _ execinfra.Processor = &streamIngestionFrontier{}
Expand Down Expand Up @@ -101,7 +101,7 @@ func newStreamIngestionFrontierProcessor(
input: input,
highWaterAtStart: spec.HighWaterAtStart,
frontier: frontier,
partitionProgress: make(map[string]jobspb.StreamIngestionProgress_PartitionProgress),
partitionProgress: make(map[string]*jobspb.StreamIngestionProgress_PartitionProgress),
heartbeatSender: heartbeatSender,
}
if err := sf.Init(
Expand Down Expand Up @@ -158,13 +158,16 @@ func newHeartbeatSender(
}, nil
}

func (h *heartbeatSender) maybeHeartbeat(ctx context.Context, frontier hlc.Timestamp) error {
func (h *heartbeatSender) maybeHeartbeat(
ctx context.Context, frontier hlc.Timestamp,
) (bool, streampb.StreamReplicationStatus, error) {
heartbeatFrequency := streamingccl.StreamReplicationConsumerHeartbeatFrequency.Get(&h.flowCtx.EvalCtx.Settings.SV)
if h.lastSent.Add(heartbeatFrequency).After(timeutil.Now()) {
return nil
return false, streampb.StreamReplicationStatus{}, nil
}
h.lastSent = timeutil.Now()
return h.client.Heartbeat(ctx, h.streamID, frontier)
s, err := h.client.Heartbeat(ctx, h.streamID, frontier)
return true, s, err
}

func (h *heartbeatSender) startHeartbeatLoop(ctx context.Context) {
Expand All @@ -177,7 +180,7 @@ func (h *heartbeatSender) startHeartbeatLoop(ctx context.Context) {
timer := time.NewTimer(streamingccl.StreamReplicationConsumerHeartbeatFrequency.
Get(&h.flowCtx.EvalCtx.Settings.SV))
defer timer.Stop()
unknownStatusErr := log.Every(1 * time.Minute)
unknownStreamStatusRetryErr := log.Every(1 * time.Minute)
for {
select {
case <-ctx.Done():
Expand All @@ -190,24 +193,24 @@ func (h *heartbeatSender) startHeartbeatLoop(ctx context.Context) {
case frontier := <-h.frontierUpdates:
h.frontier.Forward(frontier)
}
err := h.maybeHeartbeat(ctx, h.frontier)
if err == nil {
continue
sent, streamStatus, err := h.maybeHeartbeat(ctx, h.frontier)
// TODO(casper): add unit tests to test different kinds of client errors.
if err != nil {
return err
}

var se streamingccl.StreamStatusErr
if !errors.As(err, &se) {
return errors.Wrap(err, "unknown stream status error")
if !sent || streamStatus.StreamStatus == streampb.StreamReplicationStatus_STREAM_ACTIVE {
continue
}

if se.StreamStatus == streampb.StreamReplicationStatus_UNKNOWN_STREAM_STATUS_RETRY {
if unknownStatusErr.ShouldLog() {
log.Warningf(ctx, "replication stream %d has unknown status error", se.StreamID)
if streamStatus.StreamStatus == streampb.StreamReplicationStatus_UNKNOWN_STREAM_STATUS_RETRY {
if unknownStreamStatusRetryErr.ShouldLog() {
log.Warningf(ctx, "replication stream %d has unknown stream status error and will retry later", h.streamID)
}
continue
}
// The replication stream is either paused or inactive.
return err
return streamingccl.NewStreamStatusErr(h.streamID, streamStatus.StreamStatus)
}
}
err := errors.CombineErrors(sendHeartbeats(), h.client.Close())
Expand Down Expand Up @@ -253,18 +256,18 @@ func (sf *streamIngestionFrontier) Next() (
break
}

if err := sf.maybeUpdatePartitionProgress(); err != nil {
// Updating the partition progress isn't a fatal error.
log.Errorf(sf.Ctx, "failed to update partition progress: %+v", err)
}

var frontierChanged bool
var err error
if frontierChanged, err = sf.noteResolvedTimestamps(row[0]); err != nil {
sf.MoveToDraining(err)
break
}

if err := sf.maybeUpdatePartitionProgress(); err != nil {
// Updating the partition progress isn't a fatal error.
log.Errorf(sf.Ctx, "failed to update partition progress: %+v", err)
}

// Send back a row to the job so that it can update the progress.
newResolvedTS := sf.frontier.Frontier()
select {
Expand Down Expand Up @@ -371,7 +374,7 @@ func (sf *streamIngestionFrontier) maybeUpdatePartitionProgress() error {
partitionKey := span.Key
partition := string(partitionKey)
if curFrontier, ok := partitionFrontiers[partition]; !ok {
partitionFrontiers[partition] = jobspb.StreamIngestionProgress_PartitionProgress{
partitionFrontiers[partition] = &jobspb.StreamIngestionProgress_PartitionProgress{
IngestedTimestamp: timestamp,
}
} else if curFrontier.IngestedTimestamp.Less(timestamp) {
Expand All @@ -385,9 +388,11 @@ func (sf *streamIngestionFrontier) maybeUpdatePartitionProgress() error {
return job.FractionProgressed(ctx, nil, /* txn */
func(ctx context.Context, details jobspb.ProgressDetails) float32 {
prog := details.(*jobspb.Progress_StreamIngest).StreamIngest
prog.PartitionProgress = partitionFrontiers
// "FractionProgressed" isn't relevant on jobs that are streaming in
// changes.
prog.PartitionProgress = make(map[string]jobspb.StreamIngestionProgress_PartitionProgress)
for partition, progress := range partitionFrontiers {
prog.PartitionProgress[partition] = *progress
}
// "FractionProgressed" isn't relevant on jobs that are streaming in changes.
return 0.0
},
)
Expand Down
Loading

0 comments on commit d9fd5bf

Please sign in to comment.