From baf89289c7f3301aac7016aea2d99feba43f2a07 Mon Sep 17 00:00:00 2001 From: Paul Bardea Date: Sun, 17 Jan 2021 19:29:36 -0500 Subject: [PATCH] streamclient: add random stream client This commit introduces a new stream client implementation that generates events of a specific schema for a table ID that is specified by the stream URI. Properties of the stream, such as the frequency of the events and the range of the randomly generated KVs can be controlled with the appropriate parameters specified in the stream address. To use the new stream client the `NewStreamClient` constructor has been modified to accept a stream address. The stream address allows the client to determine which client implementation should be used. Further, the addition of this client exposed a bug in the SST batcher which rejects batches that modify the same key more than once, even if disallowShadowing is set to false. Release note: None --- pkg/ccl/streamingccl/addresses.go | 7 + pkg/ccl/streamingccl/streamclient/BUILD.bazel | 16 +- pkg/ccl/streamingccl/streamclient/client.go | 22 ++ .../streamingccl/streamclient/client_test.go | 7 + .../streamclient/random_stream_client.go | 256 +++++++++++++++ .../streamclient/stream_client.go | 5 - pkg/ccl/streamingccl/streamingest/BUILD.bazel | 3 + .../streamingest/stream_ingestion_job.go | 12 +- .../stream_ingestion_processor.go | 8 +- .../stream_ingestion_processor_test.go | 229 +++++++++++--- pkg/kv/bulk/sst_batcher.go | 11 +- pkg/kv/kvserver/kvserverbase/bulk_adder.go | 6 +- pkg/sql/execinfrapb/processors_bulk_io.pb.go | 293 ++++++++++-------- pkg/sql/execinfrapb/processors_bulk_io.proto | 2 + 14 files changed, 689 insertions(+), 188 deletions(-) create mode 100644 pkg/ccl/streamingccl/streamclient/random_stream_client.go diff --git a/pkg/ccl/streamingccl/addresses.go b/pkg/ccl/streamingccl/addresses.go index 3b38112aa959..8153fd8d3f66 100644 --- a/pkg/ccl/streamingccl/addresses.go +++ b/pkg/ccl/streamingccl/addresses.go @@ -8,10 +8,17 @@ package streamingccl +import "net/url" + // StreamAddress is the location of the stream. The topology of a stream should // be resolvable given a stream address. type StreamAddress string +// URL parses the stream address as a URL. +func (sa StreamAddress) URL() (*url.URL, error) { + return url.Parse(string(sa)) +} + // PartitionAddress is the address where the stream client should be able to // read the events produced by a partition of a stream. // diff --git a/pkg/ccl/streamingccl/streamclient/BUILD.bazel b/pkg/ccl/streamingccl/streamclient/BUILD.bazel index ed0faeea4338..33b33eae133a 100644 --- a/pkg/ccl/streamingccl/streamclient/BUILD.bazel +++ b/pkg/ccl/streamingccl/streamclient/BUILD.bazel @@ -4,11 +4,25 @@ go_library( name = "streamclient", srcs = [ "client.go", + "random_stream_client.go", "stream_client.go", ], importpath = "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient", visibility = ["//visibility:public"], - deps = ["//pkg/ccl/streamingccl"], + deps = [ + "//pkg/ccl/streamingccl", + "//pkg/keys", + "//pkg/roachpb", + "//pkg/sql", + "//pkg/sql/catalog/descpb", + "//pkg/sql/catalog/systemschema", + "//pkg/sql/catalog/tabledesc", + "//pkg/sql/rowenc", + "//pkg/sql/sem/tree", + "//pkg/util/hlc", + "//pkg/util/syncutil", + "//pkg/util/timeutil", + ], ) go_test( diff --git a/pkg/ccl/streamingccl/streamclient/client.go b/pkg/ccl/streamingccl/streamclient/client.go index 51fec66532d8..df6302ed36d1 100644 --- a/pkg/ccl/streamingccl/streamclient/client.go +++ b/pkg/ccl/streamingccl/streamclient/client.go @@ -32,3 +32,25 @@ type Client interface { // encountered while reading the stream. ConsumePartition(ctx context.Context, address streamingccl.PartitionAddress, startTime time.Time) (chan streamingccl.Event, error) } + +// NewStreamClient creates a new stream client based on the stream +// address. +func NewStreamClient(streamAddress streamingccl.StreamAddress) (Client, error) { + var streamClient Client + streamURL, err := streamAddress.URL() + if err != nil { + return streamClient, err + } + + switch streamURL.Scheme { + case TestScheme: + streamClient, err = newRandomStreamClient(streamURL) + if err != nil { + return streamClient, err + } + default: + streamClient = &client{} + } + + return streamClient, nil +} diff --git a/pkg/ccl/streamingccl/streamclient/client_test.go b/pkg/ccl/streamingccl/streamclient/client_test.go index 174325bb9001..054cab14a3cf 100644 --- a/pkg/ccl/streamingccl/streamclient/client_test.go +++ b/pkg/ccl/streamingccl/streamclient/client_test.go @@ -10,6 +10,7 @@ package streamclient import ( "context" + "net/url" "testing" "time" @@ -89,9 +90,15 @@ func TestExampleClientUsage(t *testing.T) { // Ensure that all implementations specified in this test properly close the // eventChannel when the given context is canceled. func TestImplementationsCloseChannel(t *testing.T) { + streamURL, err := url.Parse("test://52") + require.NoError(t, err) + randomClient, err := newRandomStreamClient(streamURL) + require.NoError(t, err) + // TODO: Add SQL client and file client here when implemented. impls := []Client{ &client{}, + randomClient, } for _, impl := range impls { diff --git a/pkg/ccl/streamingccl/streamclient/random_stream_client.go b/pkg/ccl/streamingccl/streamclient/random_stream_client.go new file mode 100644 index 000000000000..61e7a5b34bdb --- /dev/null +++ b/pkg/ccl/streamingccl/streamclient/random_stream_client.go @@ -0,0 +1,256 @@ +// Copyright 2020 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package streamclient + +import ( + "context" + "math/rand" + "net/url" + "strconv" + "time" + + "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" + "github.com/cockroachdb/cockroach/pkg/sql/rowenc" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" +) + +const ( + // RandomStreamSchema is the schema of the KVs emitted by the random stream + // client. + RandomStreamSchema = "CREATE TABLE test (k INT PRIMARY KEY, v INT)" + + // TestScheme is the URI scheme used to create a test load. + TestScheme = "test" + // ValueRangeKey controls the range of the randomly generated values produced + // by this workload. The workload will generate between 0 and this value. + ValueRangeKey = "VALUE_RANGE" + // KVFrequency is the frequency in nanoseconds that the stream will emit + // randomly generated KV events. + KVFrequency = "KV_FREQUENCY" + // KVsPerCheckpoint controls approximately how many KV events should be emitted + // between checkpoint events. + KVsPerCheckpoint = "KVS_PER_CHECKPOINT" +) + +// randomStreamConfig specifies the variables that controls the rate and type of +// events that the generated stream emits. +type randomStreamConfig struct { + valueRange int + kvFrequency time.Duration + kvsPerCheckpoint int +} + +func parseRandomStreamConfig(streamURL *url.URL) (randomStreamConfig, error) { + c := randomStreamConfig{ + valueRange: 100, + kvFrequency: 10 * time.Microsecond, + kvsPerCheckpoint: 100, + } + + var err error + if valueRangeStr := streamURL.Query().Get(ValueRangeKey); valueRangeStr != "" { + c.valueRange, err = strconv.Atoi(valueRangeStr) + if err != nil { + return c, err + } + } + + if kvFreqStr := streamURL.Query().Get(KVFrequency); kvFreqStr != "" { + kvFreq, err := strconv.Atoi(kvFreqStr) + c.kvFrequency = time.Duration(kvFreq) + if err != nil { + return c, err + } + } + + if kvsPerCheckpointStr := streamURL.Query().Get(KVsPerCheckpoint); kvsPerCheckpointStr != "" { + c.kvsPerCheckpoint, err = strconv.Atoi(kvsPerCheckpointStr) + if err != nil { + return c, err + } + } + + return c, nil +} + +// randomStreamClient is a temporary stream client implementation that generates +// random events. +// +// It expects a table with the schema `RandomStreamSchema` to already exist, +// with table ID `` to be used in the URI. Opening the stream client +// on the URI 'test://' will generate random events into this table. +// +// TODO: Move this over to a _test file in the ingestion package when there is a +// real stream client implementation. +type randomStreamClient struct { + baseDesc *tabledesc.Mutable + config randomStreamConfig + + // interceptors can be registered to peek at every event generated by this + // client. + mu struct { + syncutil.Mutex + + interceptors []func(streamingccl.Event) + } +} + +var _ Client = &randomStreamClient{} + +// newRandomStreamClient returns a stream client that generates a random set of +// events on a table with an integer key and integer value for the table with +// the given ID. +func newRandomStreamClient(streamURL *url.URL) (Client, error) { + tableID, err := strconv.Atoi(streamURL.Host) + if err != nil { + return nil, err + } + testTable, err := sql.CreateTestTableDescriptor( + context.Background(), + 50, /* defaultdb */ + descpb.ID(tableID), + RandomStreamSchema, + systemschema.JobsTable.Privileges, + ) + if err != nil { + return nil, err + } + + streamConfig, err := parseRandomStreamConfig(streamURL) + if err != nil { + return nil, err + } + + return &randomStreamClient{ + baseDesc: testTable, + config: streamConfig, + }, nil +} + +// GetTopology implements the Client interface. +func (m *randomStreamClient) GetTopology( + _ streamingccl.StreamAddress, +) (streamingccl.Topology, error) { + panic("not yet implemented") +} + +// ConsumePartition implements the Client interface. +func (m *randomStreamClient) ConsumePartition( + ctx context.Context, _ streamingccl.PartitionAddress, startTime time.Time, +) (chan streamingccl.Event, error) { + eventCh := make(chan streamingccl.Event) + now := timeutil.Now() + if startTime.After(now) { + panic("cannot start random stream client event stream in the future") + } + lastResolvedTime := startTime + + go func() { + defer close(eventCh) + + // rand is not thread safe, so create a random source for each partition. + r := rand.New(rand.NewSource(timeutil.Now().UnixNano())) + kvInterval := m.config.kvFrequency + resolvedInterval := kvInterval * time.Duration(m.config.kvsPerCheckpoint) + + kvTimer := timeutil.NewTimer() + kvTimer.Reset(0) + defer kvTimer.Stop() + + resolvedTimer := timeutil.NewTimer() + resolvedTimer.Reset(0) + defer resolvedTimer.Stop() + + for { + var event streamingccl.Event + select { + case <-kvTimer.C: + kvTimer.Read = true + event = streamingccl.MakeKVEvent(m.makeRandomKey(r, lastResolvedTime)) + kvTimer.Reset(kvInterval) + case <-resolvedTimer.C: + resolvedTimer.Read = true + resolvedTime := timeutil.Now() + hlcResolvedTime := hlc.Timestamp{WallTime: resolvedTime.UnixNano()} + event = streamingccl.MakeCheckpointEvent(hlcResolvedTime) + lastResolvedTime = resolvedTime + resolvedTimer.Reset(resolvedInterval) + } + + // TODO: Consider keeping an in-memory copy so that tests can verify + // that the data we've ingested is correct. + select { + case eventCh <- event: + case <-ctx.Done(): + return + } + + if len(m.mu.interceptors) > 0 { + m.mu.Lock() + for _, interceptor := range m.mu.interceptors { + if interceptor != nil { + interceptor(event) + } + } + m.mu.Unlock() + } + } + }() + + return eventCh, nil +} + +func (m *randomStreamClient) makeRandomKey(r *rand.Rand, minTs time.Time) roachpb.KeyValue { + tableDesc := m.baseDesc + + // Create a key holding a random integer. + k, err := rowenc.TestingMakePrimaryIndexKey(tableDesc, r.Intn(m.config.valueRange)) + if err != nil { + panic(err) + } + k = keys.MakeFamilyKey(k, uint32(tableDesc.Families[0].ID)) + + // Create a value holding a random integer. + valueDatum := tree.NewDInt(tree.DInt(r.Intn(m.config.valueRange))) + valueBuf, err := rowenc.EncodeTableValue( + []byte(nil), tableDesc.Columns[1].ID, valueDatum, []byte(nil)) + if err != nil { + panic(err) + } + var v roachpb.Value + v.SetTuple(valueBuf) + v.ClearChecksum() + v.InitChecksum(k) + + // Generate a timestamp between minTs and now(). + randOffset := int(timeutil.Now().UnixNano()) - int(minTs.UnixNano()) + newTimestamp := rand.Intn(randOffset) + int(minTs.UnixNano()) + v.Timestamp = hlc.Timestamp{WallTime: int64(newTimestamp)} + + return roachpb.KeyValue{ + Key: k, + Value: v, + } +} + +// RegisterInterception implements streamingest.interceptableStreamClient. +func (m *randomStreamClient) RegisterInterception(f func(event streamingccl.Event)) { + m.mu.Lock() + defer m.mu.Unlock() + m.mu.interceptors = append(m.mu.interceptors, f) +} diff --git a/pkg/ccl/streamingccl/streamclient/stream_client.go b/pkg/ccl/streamingccl/streamclient/stream_client.go index b37803b76245..81ebd251a09f 100644 --- a/pkg/ccl/streamingccl/streamclient/stream_client.go +++ b/pkg/ccl/streamingccl/streamclient/stream_client.go @@ -20,11 +20,6 @@ type client struct{} var _ Client = &client{} -// NewStreamClient returns a new mock stream client. -func NewStreamClient() Client { - return &client{} -} - // GetTopology implements the Client interface. func (m *client) GetTopology(_ streamingccl.StreamAddress) (streamingccl.Topology, error) { return streamingccl.Topology{ diff --git a/pkg/ccl/streamingccl/streamingest/BUILD.bazel b/pkg/ccl/streamingccl/streamingest/BUILD.bazel index f78786673122..e10347b69c98 100644 --- a/pkg/ccl/streamingccl/streamingest/BUILD.bazel +++ b/pkg/ccl/streamingccl/streamingest/BUILD.bazel @@ -52,6 +52,7 @@ go_test( "//pkg/jobs", "//pkg/jobs/jobspb", "//pkg/keys", + "//pkg/kv", "//pkg/roachpb", "//pkg/security", "//pkg/security/securitytest", @@ -60,12 +61,14 @@ go_test( "//pkg/sql/execinfra", "//pkg/sql/execinfrapb", "//pkg/sql/sem/tree", + "//pkg/testutils", "//pkg/testutils/distsqlutils", "//pkg/testutils/serverutils", "//pkg/testutils/sqlutils", "//pkg/testutils/testcluster", "//pkg/util/hlc", "//pkg/util/leaktest", + "//pkg/util/log", "//pkg/util/protoutil", "//pkg/util/randutil", "//pkg/util/timeutil", diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go index 541c82b9fd8b..8e4931a11432 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go @@ -30,13 +30,15 @@ type streamIngestionResumer struct { func ingest( ctx context.Context, execCtx sql.JobExecContext, - streamAddress streamingccl.PartitionAddress, + streamAddress streamingccl.StreamAddress, job *jobs.Job, ) error { // Initialize a stream client and resolve topology. - client := streamclient.NewStreamClient() - sa := streamingccl.StreamAddress(streamAddress) - topology, err := client.GetTopology(sa) + client, err := streamclient.NewStreamClient(streamAddress) + if err != nil { + return err + } + topology, err := client.GetTopology(streamAddress) if err != nil { return err } @@ -73,7 +75,7 @@ func (s *streamIngestionResumer) Resume( details := s.job.Details().(jobspb.StreamIngestionDetails) p := execCtx.(sql.JobExecContext) - err := ingest(ctx, p, streamingccl.PartitionAddress(details.StreamAddress), s.job) + err := ingest(ctx, p, details.StreamAddress, s.job) if err != nil { return err } diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go index 09b35e8eb80d..05b5786100c9 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go @@ -88,17 +88,21 @@ func newStreamIngestionDataProcessor( post *execinfrapb.PostProcessSpec, output execinfra.RowReceiver, ) (execinfra.Processor, error) { + streamClient, err := streamclient.NewStreamClient(spec.StreamAddress) + if err != nil { + return nil, err + } + sip := &streamIngestionProcessor{ flowCtx: flowCtx, spec: spec, output: output, curBatch: make([]storage.MVCCKeyValue, 0), - client: streamclient.NewStreamClient(), + client: streamClient, } evalCtx := flowCtx.EvalCtx db := flowCtx.Cfg.DB - var err error sip.batcher, err = bulk.MakeStreamSSTBatcher(sip.Ctx, db, evalCtx.Settings, func() int64 { return storageccl.MaxImportBatchSize(evalCtx.Settings) }) if err != nil { diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go index e86f0ba3e7dc..581ee81a7420 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go @@ -11,7 +11,7 @@ package streamingest import ( "context" "fmt" - "sync" + "strconv" "testing" "time" @@ -19,19 +19,30 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/distsqlutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/stretchr/testify/require" ) +type interceptableStreamClient interface { + streamclient.Client + + RegisterInterception(func(event streamingccl.Event)) +} + // mockStreamClient will always return the given slice of events when consuming // a stream partition. type mockStreamClient struct { @@ -61,14 +72,157 @@ func (m *mockStreamClient) ConsumePartition( return eventCh, nil } +// Close implements the StreamClient interface. +func (m *mockStreamClient) Close() {} + func TestStreamIngestionProcessor(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() tc := testcluster.StartTestCluster(t, 3 /* nodes */, base.TestClusterArgs{}) - defer tc.Stopper().Stop(context.Background()) + defer tc.Stopper().Stop(ctx) kvDB := tc.Server(0).DB() + v := roachpb.MakeValueFromString("value_1") + v.Timestamp = hlc.Timestamp{WallTime: 1} + sampleKV := roachpb.KeyValue{Key: roachpb.Key("key_1"), Value: v} + mockClient := &mockStreamClient{ + partitionEvents: []streamingccl.Event{ + streamingccl.MakeKVEvent(sampleKV), + streamingccl.MakeKVEvent(sampleKV), + streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 1}), + streamingccl.MakeKVEvent(sampleKV), + streamingccl.MakeKVEvent(sampleKV), + streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 4}), + }, + } + + startTime := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} + out, err := runStreamIngestionProcessor(ctx, t, kvDB, "some://stream", startTime, + nil /* interceptors */, mockClient) + require.NoError(t, err) + + // Compare the set of results since the ordering is not guaranteed. + expectedRows := map[string]struct{}{ + "partition1{-\\x00} 0.000000001,0": {}, + "partition1{-\\x00} 0.000000004,0": {}, + "partition2{-\\x00} 0.000000001,0": {}, + "partition2{-\\x00} 0.000000004,0": {}, + } + actualRows := make(map[string]struct{}) + for { + row := out.NextNoMeta(t) + if row == nil { + break + } + datum := row[0].Datum + protoBytes, ok := datum.(*tree.DBytes) + require.True(t, ok) + + var resolvedSpan jobspb.ResolvedSpan + require.NoError(t, protoutil.Unmarshal([]byte(*protoBytes), &resolvedSpan)) + + actualRows[fmt.Sprintf("%s %s", resolvedSpan.Span, resolvedSpan.Timestamp)] = struct{}{} + } + + require.Equal(t, expectedRows, actualRows) +} + +// TestRandomClientGeneration tests the ingestion processor against a random +// stream workload. +func TestRandomClientGeneration(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + makeTestStreamURI := func( + tableID string, + valueRange, kvsPerResolved int, + kvFrequency time.Duration, + ) string { + return "test://" + tableID + "?VALUE_RANGE=" + strconv.Itoa(valueRange) + + "&KV_FREQUENCY=" + strconv.Itoa(int(kvFrequency)) + + "&KVS_PER_RESOLVED=" + strconv.Itoa(kvsPerResolved) + } + + tc := testcluster.StartTestCluster(t, 3 /* nodes */, base.TestClusterArgs{}) + defer tc.Stopper().Stop(ctx) + kvDB := tc.Server(0).DB() + conn := tc.Conns[0] + sqlDB := sqlutils.MakeSQLRunner(conn) + + // Create the expected table for the random stream to ingest into. + sqlDB.Exec(t, streamclient.RandomStreamSchema) + tableID := sqlDB.QueryStr(t, `SELECT id FROM system.namespace WHERE name = 'test'`)[0][0] + + // TODO: Consider testing variations on these parameters. + valueRange := 100 + kvsPerResolved := 1_000 + kvFrequency := 50 * time.Nanosecond + streamAddr := makeTestStreamURI(tableID, valueRange, kvsPerResolved, kvFrequency) + + startTime := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} + + ctx, cancel := context.WithCancel(ctx) + // Cancel the flow after emitting 1000 checkpoint events from the client. + cancelAfterCheckpoints := makeCheckpointEventCounter(1_000, cancel) + out, err := runStreamIngestionProcessor(ctx, t, kvDB, streamAddr, startTime, + cancelAfterCheckpoints, nil /* mockClient */) + require.NoError(t, err) + + p1Key := roachpb.Key("partition1") + p2Key := roachpb.Key("partition2") + p1Span := roachpb.Span{Key: p1Key, EndKey: p1Key.Next()} + p2Span := roachpb.Span{Key: p2Key, EndKey: p2Key.Next()} + numResolvedEvents := 0 + for { + row, meta := out.Next() + if meta != nil { + // The flow may fail with a context cancellation error if the processor + // was cut of during flushing. + if !testutils.IsError(meta.Err, "context canceled") { + t.Fatalf("unexpected meta error %v", meta.Err) + } + } + if row == nil { + break + } + datum := row[0].Datum + protoBytes, ok := datum.(*tree.DBytes) + require.True(t, ok) + + var resolvedSpan jobspb.ResolvedSpan + require.NoError(t, protoutil.Unmarshal([]byte(*protoBytes), &resolvedSpan)) + + if resolvedSpan.Span.String() != p1Span.String() && resolvedSpan.Span.String() != p2Span.String() { + t.Fatalf("expected resolved span %v to be either %v or %v", resolvedSpan.Span, p1Span, p2Span) + } + + // All resolved timestamp events should be greater than the start time. + require.Less(t, startTime.WallTime, resolvedSpan.Timestamp.WallTime) + numResolvedEvents++ + } + + // Check that some rows have been ingested and that we've emitted some resolved events. + numRows, err := strconv.Atoi(sqlDB.QueryStr(t, `SELECT count(*) FROM defaultdb.test`)[0][0]) + require.NoError(t, err) + require.Greater(t, numRows, 0, "at least 1 row ingested expected") + + require.Greater(t, numResolvedEvents, 0, "at least 1 resolved event expected") +} + +func runStreamIngestionProcessor( + ctx context.Context, + t *testing.T, + kvDB *kv.DB, + streamAddr string, + startTime hlc.Timestamp, + interceptEvents func(streamingccl.Event), + mockClient streamclient.Client, +) (*distsqlutils.RowBuffer, error) { st := cluster.MakeTestingClusterSettings() evalCtx := tree.MakeTestingEvalContext(st) @@ -84,64 +238,55 @@ func TestStreamIngestionProcessor(t *testing.T) { EvalCtx: &evalCtx, } - var wg sync.WaitGroup out := &distsqlutils.RowBuffer{} post := execinfrapb.PostProcessSpec{} var spec execinfrapb.StreamIngestionDataSpec - spec.PartitionAddresses = []streamingccl.PartitionAddress{"s3://my_streams/stream/partition1", "s3://my_streams/stream/partition2"} - proc, err := newStreamIngestionDataProcessor(&flowCtx, 0 /* processorID */, spec, &post, out) + spec.StreamAddress = streamingccl.StreamAddress(streamAddr) + + spec.PartitionAddresses = []streamingccl.PartitionAddress{"partition1", "partition2"} + spec.StartTime = startTime + processorID := int32(0) + proc, err := newStreamIngestionDataProcessor(&flowCtx, processorID, spec, &post, out) require.NoError(t, err) sip, ok := proc.(*streamIngestionProcessor) if !ok { t.Fatal("expected the processor that's created to be a split and scatter processor") } - // Inject a mock client. - v := roachpb.MakeValueFromString("value_1") - v.Timestamp = hlc.Timestamp{WallTime: 1} - sampleKV := roachpb.KeyValue{Key: roachpb.Key("key_1"), Value: v} - sip.client = &mockStreamClient{ - partitionEvents: []streamingccl.Event{ - streamingccl.MakeKVEvent(sampleKV), - streamingccl.MakeKVEvent(sampleKV), - streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 1}), - streamingccl.MakeKVEvent(sampleKV), - streamingccl.MakeKVEvent(sampleKV), - streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 4}), - }, + if mockClient != nil { + sip.client = mockClient + } + + if interceptableClient, ok := sip.client.(interceptableStreamClient); ok { + interceptableClient.RegisterInterception(interceptEvents) + // TODO: Inject an interceptor here that keeps track of generated events so + // we can compare. + } else if interceptEvents != nil { + t.Fatalf("interceptor specified, but client %T does not implement interceptableStreamClient", + sip.client) } - sip.Run(context.Background()) - wg.Wait() + sip.Run(ctx) // Ensure that all the outputs are properly closed. if !out.ProducerClosed() { t.Fatalf("output RowReceiver not closed") } + return out, err +} - // Compare the set of results since the ordering is not guaranteed. - expectedRows := map[string]struct{}{ - "s3://my_streams/stream/partition1{-\\x00} 0.000000001,0": {}, - "s3://my_streams/stream/partition1{-\\x00} 0.000000004,0": {}, - "s3://my_streams/stream/partition2{-\\x00} 0.000000001,0": {}, - "s3://my_streams/stream/partition2{-\\x00} 0.000000004,0": {}, - } - actualRows := make(map[string]struct{}) - for { - row := out.NextNoMeta(t) - if row == nil { - break +// makeCheckpointEventCounter runs f after seeing `threshold` number of +// checkpoint events. +func makeCheckpointEventCounter(threshold int, f func()) func(streamingccl.Event) { + numCheckpointEventsGenerated := 0 + return func(event streamingccl.Event) { + switch event.Type() { + case streamingccl.CheckpointEvent: + numCheckpointEventsGenerated++ + if numCheckpointEventsGenerated > threshold { + f() + } } - datum := row[0].Datum - protoBytes, ok := datum.(*tree.DBytes) - require.True(t, ok) - - var resolvedSpan jobspb.ResolvedSpan - require.NoError(t, protoutil.Unmarshal([]byte(*protoBytes), &resolvedSpan)) - - actualRows[fmt.Sprintf("%s %s", resolvedSpan.Span, resolvedSpan.Timestamp)] = struct{}{} } - - require.Equal(t, expectedRows, actualRows) } diff --git a/pkg/kv/bulk/sst_batcher.go b/pkg/kv/bulk/sst_batcher.go index edbc30c735fe..09d0e94e7d66 100644 --- a/pkg/kv/bulk/sst_batcher.go +++ b/pkg/kv/bulk/sst_batcher.go @@ -64,12 +64,17 @@ type SSTBatcher struct { // which are the same, will all correspond to the same kv in the inverted // index. The method which generates these kvs does not dedup, thus we rely on // the SSTBatcher to dedup them (by skipping), rather than throwing a - // DuplicateKeyError. This is also true when used with IMPORT. Import + // DuplicateKeyError. + // This is also true when used with IMPORT. Import // generally prohibits the ingestion of KVs which will shadow existing data, // with the exception of duplicates having the same value and timestamp. To // maintain uniform behavior, duplicates in the same batch with equal values // will not raise a DuplicateKeyError. skipDuplicates bool + // ingestAll can only be set when disallowShadowing and skipDuplicates are + // false. It will never return a duplicateKey error and continue ingesting all + // data provided to it. + ingestAll bool // The rest of the fields accumulated state as opposed to configuration. Some, // like totalRows, are accumulated _across_ batches and are not reset between @@ -117,7 +122,7 @@ func MakeSSTBatcher( func MakeStreamSSTBatcher( ctx context.Context, db SSTSender, settings *cluster.Settings, flushBytes func() int64, ) (*SSTBatcher, error) { - b := &SSTBatcher{db: db, settings: settings, maxSize: flushBytes, disallowShadowing: false, skipDuplicates: true} + b := &SSTBatcher{db: db, settings: settings, maxSize: flushBytes, ingestAll: true} err := b.Reset(ctx) return b, err } @@ -143,7 +148,7 @@ func (b *SSTBatcher) updateMVCCStats(key storage.MVCCKey, value []byte) { // keys -- like RESTORE where we want the restored data to look the like backup. // Keys must be added in order. func (b *SSTBatcher) AddMVCCKey(ctx context.Context, key storage.MVCCKey, value []byte) error { - if len(b.batchEndKey) > 0 && bytes.Equal(b.batchEndKey, key.Key) { + if len(b.batchEndKey) > 0 && bytes.Equal(b.batchEndKey, key.Key) && !b.ingestAll { if b.skipDuplicates && bytes.Equal(b.batchEndValue, value) { return nil } diff --git a/pkg/kv/kvserver/kvserverbase/bulk_adder.go b/pkg/kv/kvserver/kvserverbase/bulk_adder.go index 563054da7d3d..4f8f686093a8 100644 --- a/pkg/kv/kvserver/kvserverbase/bulk_adder.go +++ b/pkg/kv/kvserver/kvserverbase/bulk_adder.go @@ -47,10 +47,10 @@ type BulkAdderOptions struct { // BulkAdder buffer if the memory monitor permits. StepBufferSize int64 - // SkipLocalDuplicates configures handling of duplicate keys within a local - // sorted batch. When true if the same key/value pair is added more than once + // SkipDuplicates configures handling of duplicate keys within a local sorted + // batch. When true if the same key/value pair is added more than once // subsequent additions will be ignored instead of producing an error. If an - // attempt to add the same key has a differnet value, it is always an error. + // attempt to add the same key has a different value, it is always an error. // Once a batch is flushed – explicitly or automatically – local duplicate // detection does not apply. SkipDuplicates bool diff --git a/pkg/sql/execinfrapb/processors_bulk_io.pb.go b/pkg/sql/execinfrapb/processors_bulk_io.pb.go index d84221ebeb82..4560f871184b 100644 --- a/pkg/sql/execinfrapb/processors_bulk_io.pb.go +++ b/pkg/sql/execinfrapb/processors_bulk_io.pb.go @@ -72,7 +72,7 @@ func (x *FileCompression) UnmarshalJSON(data []byte) error { return nil } func (FileCompression) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_processors_bulk_io_fd790481d663b450, []int{0} + return fileDescriptor_processors_bulk_io_791817333cf840e0, []int{0} } type BackfillerSpec_Type int32 @@ -111,7 +111,7 @@ func (x *BackfillerSpec_Type) UnmarshalJSON(data []byte) error { return nil } func (BackfillerSpec_Type) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_processors_bulk_io_fd790481d663b450, []int{0, 0} + return fileDescriptor_processors_bulk_io_791817333cf840e0, []int{0, 0} } // BackfillerSpec is the specification for a "schema change backfiller". @@ -143,7 +143,7 @@ func (m *BackfillerSpec) Reset() { *m = BackfillerSpec{} } func (m *BackfillerSpec) String() string { return proto.CompactTextString(m) } func (*BackfillerSpec) ProtoMessage() {} func (*BackfillerSpec) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_bulk_io_fd790481d663b450, []int{0} + return fileDescriptor_processors_bulk_io_791817333cf840e0, []int{0} } func (m *BackfillerSpec) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -183,7 +183,7 @@ func (m *JobProgress) Reset() { *m = JobProgress{} } func (m *JobProgress) String() string { return proto.CompactTextString(m) } func (*JobProgress) ProtoMessage() {} func (*JobProgress) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_bulk_io_fd790481d663b450, []int{1} + return fileDescriptor_processors_bulk_io_791817333cf840e0, []int{1} } func (m *JobProgress) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -246,7 +246,7 @@ func (m *ReadImportDataSpec) Reset() { *m = ReadImportDataSpec{} } func (m *ReadImportDataSpec) String() string { return proto.CompactTextString(m) } func (*ReadImportDataSpec) ProtoMessage() {} func (*ReadImportDataSpec) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_bulk_io_fd790481d663b450, []int{2} + return fileDescriptor_processors_bulk_io_791817333cf840e0, []int{2} } func (m *ReadImportDataSpec) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -284,7 +284,7 @@ func (m *ReadImportDataSpec_ImportTable) Reset() { *m = ReadImportDataSp func (m *ReadImportDataSpec_ImportTable) String() string { return proto.CompactTextString(m) } func (*ReadImportDataSpec_ImportTable) ProtoMessage() {} func (*ReadImportDataSpec_ImportTable) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_bulk_io_fd790481d663b450, []int{2, 0} + return fileDescriptor_processors_bulk_io_791817333cf840e0, []int{2, 0} } func (m *ReadImportDataSpec_ImportTable) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -315,13 +315,15 @@ type StreamIngestionDataSpec struct { PartitionAddresses []github_com_cockroachdb_cockroach_pkg_ccl_streamingccl.PartitionAddress `protobuf:"bytes,1,rep,name=partition_addresses,json=partitionAddresses,customtype=github.com/cockroachdb/cockroach/pkg/ccl/streamingccl.PartitionAddress" json:"partition_addresses"` // The processor will ingest events from StartTime onwards. StartTime hlc.Timestamp `protobuf:"bytes,2,opt,name=start_time,json=startTime" json:"start_time"` + // StreamAddress locate the stream so that a stream client can be initialized. + StreamAddress github_com_cockroachdb_cockroach_pkg_ccl_streamingccl.StreamAddress `protobuf:"bytes,3,opt,name=stream_address,json=streamAddress,customtype=github.com/cockroachdb/cockroach/pkg/ccl/streamingccl.StreamAddress" json:"stream_address"` } func (m *StreamIngestionDataSpec) Reset() { *m = StreamIngestionDataSpec{} } func (m *StreamIngestionDataSpec) String() string { return proto.CompactTextString(m) } func (*StreamIngestionDataSpec) ProtoMessage() {} func (*StreamIngestionDataSpec) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_bulk_io_fd790481d663b450, []int{3} + return fileDescriptor_processors_bulk_io_791817333cf840e0, []int{3} } func (m *StreamIngestionDataSpec) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -367,7 +369,7 @@ func (m *BackupDataSpec) Reset() { *m = BackupDataSpec{} } func (m *BackupDataSpec) String() string { return proto.CompactTextString(m) } func (*BackupDataSpec) ProtoMessage() {} func (*BackupDataSpec) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_bulk_io_fd790481d663b450, []int{4} + return fileDescriptor_processors_bulk_io_791817333cf840e0, []int{4} } func (m *BackupDataSpec) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -406,7 +408,7 @@ func (m *RestoreSpanEntry) Reset() { *m = RestoreSpanEntry{} } func (m *RestoreSpanEntry) String() string { return proto.CompactTextString(m) } func (*RestoreSpanEntry) ProtoMessage() {} func (*RestoreSpanEntry) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_bulk_io_fd790481d663b450, []int{5} + return fileDescriptor_processors_bulk_io_791817333cf840e0, []int{5} } func (m *RestoreSpanEntry) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -444,7 +446,7 @@ func (m *RestoreDataSpec) Reset() { *m = RestoreDataSpec{} } func (m *RestoreDataSpec) String() string { return proto.CompactTextString(m) } func (*RestoreDataSpec) ProtoMessage() {} func (*RestoreDataSpec) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_bulk_io_fd790481d663b450, []int{6} + return fileDescriptor_processors_bulk_io_791817333cf840e0, []int{6} } func (m *RestoreDataSpec) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -478,7 +480,7 @@ func (m *SplitAndScatterSpec) Reset() { *m = SplitAndScatterSpec{} } func (m *SplitAndScatterSpec) String() string { return proto.CompactTextString(m) } func (*SplitAndScatterSpec) ProtoMessage() {} func (*SplitAndScatterSpec) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_bulk_io_fd790481d663b450, []int{7} + return fileDescriptor_processors_bulk_io_791817333cf840e0, []int{7} } func (m *SplitAndScatterSpec) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -511,7 +513,7 @@ func (m *SplitAndScatterSpec_RestoreEntryChunk) Reset() { *m = SplitAndS func (m *SplitAndScatterSpec_RestoreEntryChunk) String() string { return proto.CompactTextString(m) } func (*SplitAndScatterSpec_RestoreEntryChunk) ProtoMessage() {} func (*SplitAndScatterSpec_RestoreEntryChunk) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_bulk_io_fd790481d663b450, []int{7, 0} + return fileDescriptor_processors_bulk_io_791817333cf840e0, []int{7, 0} } func (m *SplitAndScatterSpec_RestoreEntryChunk) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -558,7 +560,7 @@ func (m *CSVWriterSpec) Reset() { *m = CSVWriterSpec{} } func (m *CSVWriterSpec) String() string { return proto.CompactTextString(m) } func (*CSVWriterSpec) ProtoMessage() {} func (*CSVWriterSpec) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_bulk_io_fd790481d663b450, []int{8} + return fileDescriptor_processors_bulk_io_791817333cf840e0, []int{8} } func (m *CSVWriterSpec) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -593,7 +595,7 @@ func (m *BulkRowWriterSpec) Reset() { *m = BulkRowWriterSpec{} } func (m *BulkRowWriterSpec) String() string { return proto.CompactTextString(m) } func (*BulkRowWriterSpec) ProtoMessage() {} func (*BulkRowWriterSpec) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_bulk_io_fd790481d663b450, []int{9} + return fileDescriptor_processors_bulk_io_791817333cf840e0, []int{9} } func (m *BulkRowWriterSpec) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -933,6 +935,10 @@ func (m *StreamIngestionDataSpec) MarshalTo(dAtA []byte) (int, error) { return 0, err } i += n7 + dAtA[i] = 0x1a + i++ + i = encodeVarintProcessorsBulkIo(dAtA, i, uint64(len(m.StreamAddress))) + i += copy(dAtA[i:], m.StreamAddress) return i, nil } @@ -1442,6 +1448,8 @@ func (m *StreamIngestionDataSpec) Size() (n int) { } l = m.StartTime.Size() n += 1 + l + sovProcessorsBulkIo(uint64(l)) + l = len(m.StreamAddress) + n += 1 + l + sovProcessorsBulkIo(uint64(l)) return n } @@ -2660,6 +2668,35 @@ func (m *StreamIngestionDataSpec) Unmarshal(dAtA []byte) error { return err } iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field StreamAddress", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowProcessorsBulkIo + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthProcessorsBulkIo + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.StreamAddress = github_com_cockroachdb_cockroach_pkg_ccl_streamingccl.StreamAddress(dAtA[iNdEx:postIndex]) + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipProcessorsBulkIo(dAtA[iNdEx:]) @@ -4135,119 +4172,121 @@ var ( ) func init() { - proto.RegisterFile("sql/execinfrapb/processors_bulk_io.proto", fileDescriptor_processors_bulk_io_fd790481d663b450) + proto.RegisterFile("sql/execinfrapb/processors_bulk_io.proto", fileDescriptor_processors_bulk_io_791817333cf840e0) } -var fileDescriptor_processors_bulk_io_fd790481d663b450 = []byte{ - // 1755 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x58, 0xcf, 0x6e, 0x1b, 0xb9, - 0x19, 0xf7, 0x48, 0x23, 0x59, 0xfa, 0x14, 0xdb, 0x32, 0x93, 0xdd, 0x9d, 0x1a, 0xa8, 0x6d, 0x68, - 0xd7, 0xa9, 0x9a, 0x22, 0x12, 0x36, 0x69, 0x8b, 0xa0, 0xed, 0x6e, 0x1a, 0xc9, 0x71, 0x56, 0xce, - 0x6e, 0xe2, 0x8e, 0x62, 0x2f, 0xb0, 0x68, 0x31, 0xa0, 0x66, 0x68, 0x99, 0xd1, 0x68, 0x38, 0x26, - 0x39, 0x4e, 0x94, 0x4b, 0x0b, 0xf4, 0xd4, 0x5b, 0x1f, 0xa1, 0x6f, 0xd0, 0xbe, 0x43, 0x2f, 0x39, - 0xee, 0x71, 0xd1, 0x83, 0xd1, 0x3a, 0x6f, 0xd1, 0x53, 0x41, 0x72, 0x46, 0x1e, 0x3b, 0xb6, 0x63, - 0x37, 0xc8, 0x45, 0x99, 0x90, 0xdf, 0xef, 0xc7, 0xef, 0xff, 0x47, 0x1a, 0x9a, 0x62, 0x3f, 0x6c, - 0x93, 0x97, 0xc4, 0xa7, 0xd1, 0x2e, 0xc7, 0xf1, 0xa0, 0x1d, 0x73, 0xe6, 0x13, 0x21, 0x18, 0x17, - 0xde, 0x20, 0x09, 0x47, 0x1e, 0x65, 0xad, 0x98, 0x33, 0xc9, 0x90, 0xe3, 0x33, 0x7f, 0xc4, 0x19, - 0xf6, 0xf7, 0x5a, 0x62, 0x3f, 0x6c, 0x05, 0x54, 0x48, 0xb1, 0x1f, 0xf2, 0x24, 0x5a, 0xfa, 0xf8, - 0x39, 0x1b, 0x88, 0xb6, 0xfa, 0x89, 0x07, 0xfa, 0x1f, 0x83, 0x58, 0x72, 0xb4, 0x74, 0x3c, 0x68, - 0x53, 0x76, 0x7b, 0x97, 0xf1, 0x31, 0x96, 0xd9, 0xce, 0xa7, 0xea, 0x54, 0x1f, 0x4b, 0x1c, 0xb2, - 0x61, 0x3b, 0x20, 0xc2, 0x8f, 0x07, 0x6d, 0x21, 0x79, 0xe2, 0xcb, 0x84, 0x93, 0x20, 0x15, 0x5a, - 0xbb, 0x48, 0x35, 0x2c, 0x48, 0x76, 0x4a, 0x22, 0x69, 0xd8, 0xde, 0x0b, 0xfd, 0xb6, 0xa4, 0x63, - 0x22, 0x24, 0x1e, 0xc7, 0xe9, 0xce, 0x8d, 0x21, 0x1b, 0x32, 0xfd, 0xd9, 0x56, 0x5f, 0xe9, 0x2a, - 0xca, 0xb4, 0x0a, 0xb0, 0xc4, 0xe9, 0xda, 0x62, 0xb6, 0x86, 0x63, 0x6a, 0x96, 0x1a, 0x7f, 0x2f, - 0xc2, 0x7c, 0x07, 0xfb, 0xa3, 0x5d, 0x1a, 0x86, 0x84, 0xf7, 0x63, 0xe2, 0xa3, 0x47, 0x60, 0xcb, - 0x49, 0x4c, 0x1c, 0x6b, 0xd5, 0x6a, 0xce, 0xdf, 0xb9, 0xdd, 0x3a, 0xcf, 0x21, 0xad, 0x93, 0xb8, - 0xd6, 0xb3, 0x49, 0x4c, 0x3a, 0xf6, 0xeb, 0xc3, 0x95, 0x19, 0x57, 0x13, 0xa0, 0x0e, 0x94, 0x24, - 0x1e, 0x84, 0xc4, 0x29, 0xac, 0x5a, 0xcd, 0xda, 0x9d, 0x9b, 0xa7, 0x98, 0xc4, 0x7e, 0xa8, 0xed, - 0x7b, 0xa6, 0x64, 0xd6, 0x89, 0xf0, 0x39, 0x8d, 0x25, 0xe3, 0x29, 0x85, 0x81, 0xa2, 0x87, 0x50, - 0x12, 0x31, 0x8e, 0x84, 0x53, 0x5c, 0x2d, 0x36, 0x6b, 0x77, 0x7e, 0x7a, 0xbe, 0x36, 0x9a, 0xc6, - 0x25, 0x38, 0x50, 0xea, 0xe0, 0x28, 0xa3, 0xd1, 0x68, 0xf4, 0x39, 0x54, 0x82, 0x84, 0x63, 0x49, - 0x59, 0xe4, 0xd8, 0xab, 0x56, 0xb3, 0xd8, 0xf9, 0x48, 0x6d, 0xff, 0xf7, 0x70, 0x65, 0x4e, 0xb9, - 0xb3, 0xb5, 0x9e, 0x6e, 0xba, 0x53, 0x31, 0xf4, 0x29, 0x80, 0xbf, 0x97, 0x44, 0x23, 0x4f, 0xd0, - 0x57, 0xc4, 0x29, 0x69, 0x90, 0xe1, 0xac, 0xea, 0xf5, 0x3e, 0x7d, 0x45, 0xd0, 0x7d, 0xa8, 0x70, - 0x82, 0x83, 0x07, 0xe2, 0xe9, 0xae, 0x33, 0xab, 0xad, 0xfc, 0x71, 0x4e, 0x43, 0x15, 0xb2, 0xd6, - 0x5e, 0xe8, 0xb7, 0x9e, 0x65, 0x21, 0x4b, 0x19, 0xa6, 0xa0, 0xc6, 0x2d, 0xb0, 0x95, 0xdf, 0x50, - 0x0d, 0x66, 0x7b, 0xd1, 0x01, 0x0e, 0x69, 0x50, 0x9f, 0x41, 0x00, 0xe5, 0x2e, 0x0b, 0x93, 0x71, - 0x54, 0xb7, 0x50, 0x15, 0x4a, 0xbd, 0x28, 0x20, 0x2f, 0xeb, 0x85, 0x4d, 0xbb, 0x52, 0xae, 0xcf, - 0x36, 0x5e, 0x40, 0x6d, 0x93, 0x0d, 0xb6, 0x38, 0x1b, 0x72, 0x22, 0x04, 0xfa, 0x0c, 0xca, 0xcf, - 0xd9, 0xc0, 0xa3, 0x81, 0x8e, 0x57, 0xb1, 0x33, 0xa7, 0x0e, 0x38, 0x3a, 0x5c, 0x29, 0x6d, 0xb2, - 0x41, 0x6f, 0xdd, 0x2d, 0x3d, 0x67, 0x83, 0x5e, 0x80, 0x9a, 0x70, 0xcd, 0x67, 0x91, 0xe4, 0x74, - 0x90, 0x68, 0x1f, 0xa8, 0x88, 0x14, 0x52, 0x65, 0x4e, 0xec, 0x20, 0x07, 0x6c, 0x11, 0x32, 0xe9, - 0x14, 0x57, 0xad, 0x66, 0x29, 0x0b, 0xa7, 0x5a, 0x69, 0xbc, 0xae, 0x00, 0x52, 0xfe, 0xed, 0x8d, - 0x63, 0xc6, 0xe5, 0x3a, 0x96, 0x58, 0xa7, 0xcb, 0x1a, 0xd4, 0x04, 0x1e, 0xc7, 0x21, 0x31, 0x8e, - 0x2a, 0xe4, 0x70, 0x60, 0x36, 0xb4, 0xa7, 0x1e, 0x41, 0x25, 0x4e, 0x75, 0x76, 0xca, 0xda, 0x53, - 0x6b, 0xe7, 0xc7, 0x32, 0x67, 0x60, 0xe6, 0xb1, 0x0c, 0x8c, 0x1e, 0x41, 0x31, 0xe1, 0xd4, 0x99, - 0xd5, 0xf9, 0xf0, 0x8b, 0xf3, 0x39, 0xde, 0x56, 0xb5, 0xb5, 0xcd, 0xe9, 0xc3, 0x48, 0xf2, 0x89, - 0xab, 0x18, 0xd0, 0x17, 0x50, 0x36, 0xe5, 0xea, 0x54, 0xb4, 0x3e, 0x2b, 0x39, 0xae, 0xb4, 0x50, - 0x5a, 0xbd, 0xa7, 0x1b, 0x34, 0x24, 0x1b, 0x5a, 0x2c, 0xd5, 0x24, 0x05, 0xa1, 0x1d, 0x28, 0xeb, - 0x14, 0x15, 0x4e, 0x55, 0xab, 0x72, 0xef, 0x4a, 0xaa, 0xe8, 0x6c, 0x15, 0x5a, 0x1b, 0xcd, 0x6b, - 0xb9, 0x29, 0x1b, 0xba, 0x0f, 0x3f, 0x12, 0x23, 0x1a, 0x7b, 0x63, 0x2a, 0x04, 0x8d, 0x86, 0xde, - 0x2e, 0xe3, 0x84, 0x0e, 0x23, 0x6f, 0x44, 0x26, 0xc2, 0x81, 0x55, 0xab, 0x59, 0x49, 0x15, 0xf9, - 0x58, 0x89, 0x7d, 0x63, 0xa4, 0x36, 0x8c, 0xd0, 0x63, 0x32, 0x11, 0xe8, 0x16, 0xcc, 0xbd, 0xc0, - 0x61, 0xa8, 0xf2, 0xfa, 0x09, 0x8e, 0x98, 0x70, 0x6a, 0xb9, 0xdc, 0x3d, 0xb9, 0x85, 0xee, 0xc0, - 0x22, 0xd7, 0x25, 0xb3, 0x85, 0x39, 0x0e, 0x43, 0x12, 0x52, 0x31, 0x76, 0xe6, 0x72, 0x21, 0x7c, - 0x7b, 0x1b, 0x7d, 0x07, 0xc0, 0x89, 0x48, 0xc6, 0xc4, 0x8b, 0x99, 0x70, 0xe6, 0xb5, 0xf1, 0xbf, - 0xbe, 0x92, 0xf1, 0xae, 0x86, 0x6f, 0x31, 0x63, 0xbf, 0x5b, 0xe5, 0xd9, 0xff, 0x11, 0x01, 0x48, - 0x04, 0xe1, 0x9e, 0x6e, 0x4e, 0xce, 0xc2, 0xaa, 0xd5, 0xac, 0x76, 0x36, 0xd2, 0x4a, 0xfd, 0x72, - 0x48, 0xe5, 0x5e, 0x32, 0x68, 0xf9, 0x6c, 0xdc, 0x9e, 0x9e, 0x16, 0x0c, 0x8e, 0xbf, 0xdb, 0xf1, - 0x68, 0xd8, 0x16, 0xc4, 0x4f, 0x38, 0x95, 0x93, 0x56, 0xff, 0x77, 0x5f, 0x6f, 0x0b, 0xc2, 0x23, - 0x3c, 0x26, 0x5b, 0x8a, 0xcd, 0xad, 0x2a, 0x66, 0xfd, 0xb9, 0x94, 0x40, 0xcd, 0xa8, 0xa4, 0xc3, - 0x80, 0x7e, 0x0b, 0xb6, 0xea, 0xce, 0xba, 0x82, 0xae, 0xd6, 0xa7, 0x2c, 0x57, 0x23, 0xd1, 0x67, - 0x00, 0x12, 0xf3, 0x21, 0x91, 0x5d, 0x16, 0x0a, 0xa7, 0xb0, 0x5a, 0x6c, 0x56, 0xd3, 0xfd, 0xdc, - 0xfa, 0x92, 0x80, 0x5a, 0x2e, 0xee, 0xa8, 0x0e, 0xc5, 0x11, 0x99, 0xe8, 0x53, 0xab, 0xae, 0xfa, - 0x44, 0x4f, 0xa0, 0x74, 0x80, 0xc3, 0x24, 0xeb, 0x98, 0x57, 0x4b, 0xa9, 0x9c, 0x45, 0xae, 0xa1, - 0xf9, 0x55, 0xe1, 0x9e, 0xb5, 0xf4, 0x4b, 0xa8, 0x64, 0x79, 0x9f, 0x3f, 0xb1, 0x64, 0x4e, 0xbc, - 0x91, 0x3f, 0xb1, 0x9a, 0xc7, 0xfd, 0x06, 0xe6, 0x4f, 0xc6, 0xe9, 0x5d, 0xe8, 0x62, 0x0e, 0xbd, - 0x69, 0x57, 0x2c, 0xdd, 0xb1, 0x8a, 0x75, 0x7b, 0xd3, 0xae, 0xd8, 0xf5, 0xd2, 0xa6, 0x5d, 0x29, - 0xd5, 0xcb, 0x9b, 0x76, 0xe5, 0x5a, 0x7d, 0xae, 0x71, 0x68, 0xc1, 0x27, 0x7d, 0xc9, 0x09, 0x1e, - 0xf7, 0xa2, 0x21, 0x11, 0xaa, 0xf1, 0x4c, 0xfb, 0xc9, 0x1f, 0xe1, 0x7a, 0x8c, 0xb9, 0xa4, 0x6a, - 0xd1, 0xc3, 0x41, 0xa0, 0x8a, 0x9e, 0x08, 0xc7, 0xd2, 0x3e, 0x7d, 0xa2, 0x72, 0xe1, 0x5f, 0x87, - 0x2b, 0x1b, 0x97, 0xca, 0x05, 0xdf, 0x0f, 0xd5, 0xbc, 0x25, 0x78, 0x4c, 0xa3, 0xa1, 0xef, 0x87, - 0xad, 0xad, 0x8c, 0xf8, 0x81, 0xe1, 0x75, 0x51, 0x7c, 0x6a, 0x85, 0x08, 0xd4, 0x01, 0x10, 0x12, - 0x73, 0xe9, 0xa9, 0x32, 0x49, 0x23, 0x71, 0xa9, 0xae, 0x5e, 0xd5, 0x30, 0xb5, 0xda, 0xf8, 0xe7, - 0xac, 0x19, 0xab, 0x49, 0x3c, 0xb5, 0xeb, 0x6e, 0x36, 0xc9, 0x2c, 0x5d, 0x31, 0x9f, 0x9c, 0xd1, - 0x6d, 0xde, 0x9e, 0x5b, 0x5f, 0x41, 0x9d, 0x46, 0x92, 0xb3, 0x20, 0xf1, 0x49, 0xe0, 0x19, 0x7c, - 0xe1, 0x32, 0xf8, 0x85, 0x63, 0x58, 0x5f, 0x33, 0xdd, 0x85, 0x5a, 0x40, 0x76, 0x71, 0x12, 0x4a, - 0x4f, 0xb5, 0xcf, 0xa2, 0x2e, 0x2d, 0x94, 0x0e, 0x0b, 0x58, 0x37, 0x5b, 0xdb, 0x6e, 0xcf, 0x85, - 0x54, 0x6c, 0x9b, 0x53, 0xf4, 0x67, 0x0b, 0xae, 0x27, 0x9c, 0x0a, 0x6f, 0x30, 0xf1, 0x42, 0xe6, - 0xe3, 0x90, 0xca, 0x89, 0x37, 0x3a, 0x70, 0x6c, 0xad, 0xc2, 0x97, 0x17, 0x5f, 0x0d, 0x8e, 0x6d, - 0x57, 0x8d, 0x57, 0x74, 0x26, 0x5f, 0xa7, 0x0c, 0x8f, 0x0f, 0x4c, 0xdf, 0xbb, 0x71, 0x74, 0xb8, - 0x52, 0xdf, 0x76, 0x7b, 0xf9, 0xad, 0x1d, 0xb7, 0x9e, 0x9c, 0x12, 0x46, 0x2e, 0xd4, 0xc6, 0x07, - 0xbe, 0xef, 0xed, 0xd2, 0x50, 0x12, 0xae, 0x47, 0xf1, 0xfc, 0x89, 0x88, 0x64, 0xf6, 0x7f, 0xb3, - 0xd3, 0xed, 0x6e, 0x68, 0xa1, 0x63, 0xcb, 0x8e, 0xd7, 0x5c, 0x50, 0x2c, 0xe6, 0x1b, 0x7d, 0x05, - 0x40, 0x22, 0x9f, 0x4f, 0x62, 0x3d, 0x0e, 0xcd, 0x40, 0x6a, 0x9e, 0x41, 0xa9, 0xda, 0xff, 0xc3, - 0xa9, 0xe0, 0x53, 0xfd, 0x2b, 0xdc, 0x1c, 0x16, 0x3d, 0x85, 0xc5, 0x81, 0xb6, 0xd6, 0xcb, 0x65, - 0xcd, 0x15, 0xee, 0x02, 0x0b, 0x06, 0xdd, 0xcf, 0x72, 0x07, 0x3d, 0x86, 0x74, 0xc9, 0x23, 0x51, - 0x60, 0xe8, 0x2a, 0x97, 0xa7, 0x9b, 0x33, 0xd8, 0x87, 0x51, 0xa0, 0xc9, 0xb6, 0xa1, 0x1c, 0x8f, - 0x3c, 0x1a, 0x64, 0x53, 0xea, 0xee, 0xa5, 0x63, 0xb6, 0x35, 0xea, 0x05, 0xe9, 0x80, 0xaa, 0xaa, - 0xfb, 0xc4, 0xd6, 0xe3, 0xde, 0xba, 0x70, 0x4b, 0xb1, 0x5a, 0x3e, 0xd5, 0xa7, 0xe1, 0x43, 0xf5, - 0xe9, 0x2e, 0x7c, 0x74, 0x66, 0xea, 0x9c, 0xd1, 0x3a, 0xcf, 0x6f, 0x64, 0xf7, 0x00, 0x8e, 0x6d, - 0xc9, 0x23, 0xed, 0x33, 0x90, 0x95, 0x1c, 0xb2, 0xf1, 0x0f, 0x0b, 0xea, 0x2e, 0x11, 0x92, 0x71, - 0xa2, 0x8a, 0xc8, 0x10, 0x7c, 0x0e, 0xb6, 0xaa, 0xc3, 0x74, 0x58, 0xbc, 0xa3, 0x0c, 0xb5, 0x28, - 0x7a, 0x00, 0xa5, 0x5d, 0xaa, 0x6e, 0x0a, 0xa6, 0x74, 0xd7, 0xce, 0xba, 0x68, 0xe8, 0xe6, 0xed, - 0x92, 0xfd, 0x84, 0x08, 0xa9, 0xb3, 0x2e, 0x6b, 0x04, 0x1a, 0x89, 0x6e, 0x42, 0x2d, 0xbb, 0x01, - 0xf5, 0x82, 0x97, 0xba, 0x7c, 0xb3, 0x91, 0x9e, 0xdf, 0x68, 0xfc, 0xa9, 0x08, 0x0b, 0xa9, 0xca, - 0xd3, 0xce, 0xb3, 0x01, 0xd7, 0xb8, 0x59, 0x32, 0xd9, 0x64, 0x5d, 0x3e, 0x9b, 0x6a, 0x29, 0x50, - 0xe7, 0xd2, 0xc9, 0x9a, 0x29, 0xbc, 0x47, 0xcd, 0xf4, 0xa0, 0xcc, 0x89, 0xbe, 0xd0, 0x98, 0x6b, - 0xfd, 0xcf, 0xde, 0xe9, 0x91, 0xf4, 0x76, 0x3f, 0x22, 0x93, 0xec, 0x1a, 0x66, 0x08, 0xd4, 0x35, - 0x2c, 0x4d, 0x70, 0xd3, 0x94, 0x7e, 0x7e, 0xd1, 0xcc, 0x3c, 0xe1, 0x97, 0x0b, 0x33, 0xfc, 0x3d, - 0xb2, 0xe6, 0x6f, 0x05, 0xb8, 0xde, 0x8f, 0x43, 0x2a, 0x1f, 0x44, 0x41, 0xdf, 0xc7, 0x52, 0xa6, - 0xef, 0xaa, 0x3f, 0x40, 0x59, 0x3f, 0x1c, 0xb2, 0x09, 0x70, 0xff, 0x7c, 0x4d, 0xcf, 0x80, 0x67, - 0xda, 0x6b, 0x7d, 0xba, 0x8a, 0x27, 0x73, 0x84, 0x21, 0xcd, 0xf9, 0xb4, 0xf0, 0x9e, 0x3e, 0x5d, - 0xf2, 0x60, 0xf1, 0xad, 0xd3, 0xd0, 0x26, 0xcc, 0x12, 0xf5, 0x4e, 0x20, 0x99, 0xfe, 0xb7, 0xde, - 0xe9, 0xe9, 0x69, 0xd1, 0xa4, 0xfc, 0x19, 0x41, 0xe3, 0x2f, 0x45, 0x98, 0xeb, 0xf6, 0x77, 0xbe, - 0xe5, 0x34, 0x73, 0xce, 0x4d, 0x35, 0x9e, 0x84, 0xa4, 0x91, 0x79, 0xa3, 0xe9, 0xc2, 0xce, 0x72, - 0x30, 0xb7, 0x81, 0x7e, 0x02, 0xd7, 0x54, 0xa7, 0xf0, 0x62, 0xed, 0x18, 0x93, 0x85, 0x53, 0x41, - 0xdd, 0x43, 0xcc, 0x06, 0xfa, 0x02, 0x66, 0x99, 0xc9, 0x3c, 0x5d, 0x2c, 0xb5, 0x33, 0x07, 0x46, - 0xb7, 0xbf, 0x93, 0xa6, 0x67, 0xa6, 0x61, 0x8a, 0x39, 0x7e, 0xfd, 0x71, 0xf6, 0x42, 0xa4, 0x4f, - 0xc6, 0xfc, 0xeb, 0xcf, 0x65, 0x2f, 0x04, 0xfa, 0x3d, 0x2c, 0xfa, 0x6c, 0x1c, 0xab, 0xda, 0x53, - 0x97, 0x15, 0x9f, 0x05, 0xc4, 0x4f, 0xc7, 0xd3, 0x05, 0x0f, 0x55, 0x55, 0x1e, 0xdd, 0x63, 0x58, - 0x4a, 0x5b, 0xcf, 0x31, 0x75, 0x15, 0xd1, 0xa9, 0x1e, 0x5b, 0xfe, 0x40, 0x3d, 0xb6, 0xf1, 0x2d, - 0x2c, 0x76, 0x92, 0x50, 0x19, 0x94, 0x0b, 0xc7, 0xf4, 0xe9, 0x6e, 0xfd, 0xdf, 0x4f, 0xf7, 0x5b, - 0x6b, 0xb0, 0x70, 0xca, 0x54, 0x54, 0x01, 0xfb, 0x09, 0x8b, 0x48, 0x7d, 0x46, 0x7d, 0x3d, 0x7a, - 0x45, 0xe3, 0xba, 0xd5, 0xb9, 0xfd, 0xfa, 0x3f, 0xcb, 0x33, 0xaf, 0x8f, 0x96, 0xad, 0xef, 0x8f, - 0x96, 0xad, 0x1f, 0x8e, 0x96, 0xad, 0x7f, 0x1f, 0x2d, 0x5b, 0x7f, 0x7d, 0xb3, 0x3c, 0xf3, 0xfd, - 0x9b, 0xe5, 0x99, 0x1f, 0xde, 0x2c, 0xcf, 0x7c, 0x57, 0xcb, 0xfd, 0x75, 0xe4, 0x7f, 0x01, 0x00, - 0x00, 0xff, 0xff, 0x8d, 0xad, 0xd6, 0x98, 0xca, 0x11, 0x00, 0x00, +var fileDescriptor_processors_bulk_io_791817333cf840e0 = []byte{ + // 1783 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x58, 0x4f, 0x6f, 0x1b, 0xb9, + 0x15, 0xf7, 0x48, 0x23, 0x59, 0x7a, 0x8a, 0x1d, 0x99, 0xc9, 0xee, 0x4e, 0x0d, 0xd4, 0x36, 0xb4, + 0xeb, 0x54, 0x4d, 0x11, 0x09, 0x9b, 0xb4, 0x45, 0xd0, 0x76, 0x37, 0x8d, 0xe4, 0x38, 0x2b, 0x7b, + 0x37, 0x71, 0x47, 0xb1, 0x17, 0x58, 0xb4, 0x18, 0x50, 0x33, 0xb4, 0xcc, 0x68, 0x34, 0x1c, 0x93, + 0x1c, 0x27, 0xca, 0xa5, 0x05, 0x7a, 0xea, 0xad, 0x1f, 0xa1, 0xdf, 0xa0, 0xfd, 0x0e, 0xbd, 0xe4, + 0xb8, 0xe8, 0x69, 0xd1, 0x83, 0xd1, 0x3a, 0xdf, 0xa2, 0xa7, 0x82, 0xe4, 0x8c, 0x3c, 0x76, 0x6c, + 0xc7, 0xde, 0x60, 0x2f, 0x36, 0x45, 0xbe, 0xdf, 0x8f, 0xef, 0x3d, 0xbe, 0x3f, 0xe4, 0x40, 0x53, + 0xec, 0x87, 0x6d, 0xf2, 0x92, 0xf8, 0x34, 0xda, 0xe5, 0x38, 0x1e, 0xb4, 0x63, 0xce, 0x7c, 0x22, + 0x04, 0xe3, 0xc2, 0x1b, 0x24, 0xe1, 0xc8, 0xa3, 0xac, 0x15, 0x73, 0x26, 0x19, 0x72, 0x7c, 0xe6, + 0x8f, 0x38, 0xc3, 0xfe, 0x5e, 0x4b, 0xec, 0x87, 0xad, 0x80, 0x0a, 0x29, 0xf6, 0x43, 0x9e, 0x44, + 0x8b, 0x1f, 0x3e, 0x67, 0x03, 0xd1, 0x56, 0x7f, 0xe2, 0x81, 0xfe, 0x67, 0x10, 0x8b, 0x8e, 0x96, + 0x8e, 0x07, 0x6d, 0xca, 0xee, 0xec, 0x32, 0x3e, 0xc6, 0x32, 0x5b, 0xf9, 0x58, 0xed, 0xea, 0x63, + 0x89, 0x43, 0x36, 0x6c, 0x07, 0x44, 0xf8, 0xf1, 0xa0, 0x2d, 0x24, 0x4f, 0x7c, 0x99, 0x70, 0x12, + 0xa4, 0x42, 0xab, 0x17, 0xa9, 0x86, 0x05, 0xc9, 0x76, 0x49, 0x24, 0x0d, 0xdb, 0x7b, 0xa1, 0xdf, + 0x96, 0x74, 0x4c, 0x84, 0xc4, 0xe3, 0x38, 0x5d, 0xb9, 0x39, 0x64, 0x43, 0xa6, 0x87, 0x6d, 0x35, + 0x4a, 0x67, 0x51, 0xa6, 0x55, 0x80, 0x25, 0x4e, 0xe7, 0x16, 0xb2, 0x39, 0x1c, 0x53, 0x33, 0xd5, + 0xf8, 0x7b, 0x11, 0xe6, 0x3b, 0xd8, 0x1f, 0xed, 0xd2, 0x30, 0x24, 0xbc, 0x1f, 0x13, 0x1f, 0x3d, + 0x06, 0x5b, 0x4e, 0x62, 0xe2, 0x58, 0x2b, 0x56, 0x73, 0xfe, 0xee, 0x9d, 0xd6, 0x79, 0x0e, 0x69, + 0x9d, 0xc4, 0xb5, 0x9e, 0x4d, 0x62, 0xd2, 0xb1, 0x5f, 0x1f, 0x2e, 0xcf, 0xb8, 0x9a, 0x00, 0x75, + 0xa0, 0x24, 0xf1, 0x20, 0x24, 0x4e, 0x61, 0xc5, 0x6a, 0xd6, 0xee, 0xde, 0x3a, 0xc5, 0x24, 0xf6, + 0x43, 0x6d, 0xdf, 0x33, 0x25, 0xb3, 0x46, 0x84, 0xcf, 0x69, 0x2c, 0x19, 0x4f, 0x29, 0x0c, 0x14, + 0x3d, 0x82, 0x92, 0x88, 0x71, 0x24, 0x9c, 0xe2, 0x4a, 0xb1, 0x59, 0xbb, 0xfb, 0xd3, 0xf3, 0xb5, + 0xd1, 0x34, 0x2e, 0xc1, 0x81, 0x52, 0x07, 0x47, 0x19, 0x8d, 0x46, 0xa3, 0x4f, 0xa1, 0x12, 0x24, + 0x1c, 0x4b, 0xca, 0x22, 0xc7, 0x5e, 0xb1, 0x9a, 0xc5, 0xce, 0x07, 0x6a, 0xf9, 0x7f, 0x87, 0xcb, + 0x73, 0xca, 0x9d, 0xad, 0xb5, 0x74, 0xd1, 0x9d, 0x8a, 0xa1, 0x8f, 0x01, 0xfc, 0xbd, 0x24, 0x1a, + 0x79, 0x82, 0xbe, 0x22, 0x4e, 0x49, 0x83, 0x0c, 0x67, 0x55, 0xcf, 0xf7, 0xe9, 0x2b, 0x82, 0x1e, + 0x40, 0x85, 0x13, 0x1c, 0x3c, 0x14, 0x4f, 0x77, 0x9d, 0x59, 0x6d, 0xe5, 0x8f, 0x73, 0x1a, 0xaa, + 0x23, 0x6b, 0xed, 0x85, 0x7e, 0xeb, 0x59, 0x76, 0x64, 0x29, 0xc3, 0x14, 0xd4, 0xb8, 0x0d, 0xb6, + 0xf2, 0x1b, 0xaa, 0xc1, 0x6c, 0x2f, 0x3a, 0xc0, 0x21, 0x0d, 0xea, 0x33, 0x08, 0xa0, 0xdc, 0x65, + 0x61, 0x32, 0x8e, 0xea, 0x16, 0xaa, 0x42, 0xa9, 0x17, 0x05, 0xe4, 0x65, 0xbd, 0xb0, 0x61, 0x57, + 0xca, 0xf5, 0xd9, 0xc6, 0x0b, 0xa8, 0x6d, 0xb0, 0xc1, 0x16, 0x67, 0x43, 0x4e, 0x84, 0x40, 0x9f, + 0x40, 0xf9, 0x39, 0x1b, 0x78, 0x34, 0xd0, 0xe7, 0x55, 0xec, 0xcc, 0xa9, 0x0d, 0x8e, 0x0e, 0x97, + 0x4b, 0x1b, 0x6c, 0xd0, 0x5b, 0x73, 0x4b, 0xcf, 0xd9, 0xa0, 0x17, 0xa0, 0x26, 0x5c, 0xf3, 0x59, + 0x24, 0x39, 0x1d, 0x24, 0xda, 0x07, 0xea, 0x44, 0x0a, 0xa9, 0x32, 0x27, 0x56, 0x90, 0x03, 0xb6, + 0x08, 0x99, 0x74, 0x8a, 0x2b, 0x56, 0xb3, 0x94, 0x1d, 0xa7, 0x9a, 0x69, 0xbc, 0xae, 0x00, 0x52, + 0xfe, 0xed, 0x8d, 0x63, 0xc6, 0xe5, 0x1a, 0x96, 0x58, 0x87, 0xcb, 0x2a, 0xd4, 0x04, 0x1e, 0xc7, + 0x21, 0x31, 0x8e, 0x2a, 0xe4, 0x70, 0x60, 0x16, 0xb4, 0xa7, 0x1e, 0x43, 0x25, 0x4e, 0x75, 0x76, + 0xca, 0xda, 0x53, 0xab, 0xe7, 0x9f, 0x65, 0xce, 0xc0, 0xcc, 0x63, 0x19, 0x18, 0x3d, 0x86, 0x62, + 0xc2, 0xa9, 0x33, 0xab, 0xe3, 0xe1, 0x17, 0xe7, 0x73, 0xbc, 0xad, 0x6a, 0x6b, 0x9b, 0xd3, 0x47, + 0x91, 0xe4, 0x13, 0x57, 0x31, 0xa0, 0xcf, 0xa0, 0x6c, 0xd2, 0xd5, 0xa9, 0x68, 0x7d, 0x96, 0x73, + 0x5c, 0x69, 0xa2, 0xb4, 0x7a, 0x4f, 0xd7, 0x69, 0x48, 0xd6, 0xb5, 0x58, 0xaa, 0x49, 0x0a, 0x42, + 0x3b, 0x50, 0xd6, 0x21, 0x2a, 0x9c, 0xaa, 0x56, 0xe5, 0xfe, 0x95, 0x54, 0xd1, 0xd1, 0x2a, 0xb4, + 0x36, 0x9a, 0xd7, 0x72, 0x53, 0x36, 0xf4, 0x00, 0x7e, 0x24, 0x46, 0x34, 0xf6, 0xc6, 0x54, 0x08, + 0x1a, 0x0d, 0xbd, 0x5d, 0xc6, 0x09, 0x1d, 0x46, 0xde, 0x88, 0x4c, 0x84, 0x03, 0x2b, 0x56, 0xb3, + 0x92, 0x2a, 0xf2, 0xa1, 0x12, 0xfb, 0xca, 0x48, 0xad, 0x1b, 0xa1, 0x4d, 0x32, 0x11, 0xe8, 0x36, + 0xcc, 0xbd, 0xc0, 0x61, 0xa8, 0xe2, 0xfa, 0x09, 0x8e, 0x98, 0x70, 0x6a, 0xb9, 0xd8, 0x3d, 0xb9, + 0x84, 0xee, 0xc2, 0x02, 0xd7, 0x29, 0xb3, 0x85, 0x39, 0x0e, 0x43, 0x12, 0x52, 0x31, 0x76, 0xe6, + 0x72, 0x47, 0xf8, 0xf6, 0x32, 0xfa, 0x06, 0x80, 0x13, 0x91, 0x8c, 0x89, 0x17, 0x33, 0xe1, 0xcc, + 0x6b, 0xe3, 0x7f, 0x7d, 0x25, 0xe3, 0x5d, 0x0d, 0xdf, 0x62, 0xc6, 0x7e, 0xb7, 0xca, 0xb3, 0xdf, + 0x88, 0x00, 0x24, 0x82, 0x70, 0x4f, 0x17, 0x27, 0xe7, 0xfa, 0x8a, 0xd5, 0xac, 0x76, 0xd6, 0xd3, + 0x4c, 0xfd, 0x7c, 0x48, 0xe5, 0x5e, 0x32, 0x68, 0xf9, 0x6c, 0xdc, 0x9e, 0xee, 0x16, 0x0c, 0x8e, + 0xc7, 0xed, 0x78, 0x34, 0x6c, 0x0b, 0xe2, 0x27, 0x9c, 0xca, 0x49, 0xab, 0xff, 0xbb, 0x2f, 0xb7, + 0x05, 0xe1, 0x11, 0x1e, 0x93, 0x2d, 0xc5, 0xe6, 0x56, 0x15, 0xb3, 0x1e, 0x2e, 0x26, 0x50, 0x33, + 0x2a, 0xe9, 0x63, 0x40, 0xbf, 0x05, 0x5b, 0x55, 0x67, 0x9d, 0x41, 0x57, 0xab, 0x53, 0x96, 0xab, + 0x91, 0xe8, 0x13, 0x00, 0x89, 0xf9, 0x90, 0xc8, 0x2e, 0x0b, 0x85, 0x53, 0x58, 0x29, 0x36, 0xab, + 0xe9, 0x7a, 0x6e, 0x7e, 0x51, 0x40, 0x2d, 0x77, 0xee, 0xa8, 0x0e, 0xc5, 0x11, 0x99, 0xe8, 0x5d, + 0xab, 0xae, 0x1a, 0xa2, 0x27, 0x50, 0x3a, 0xc0, 0x61, 0x92, 0x55, 0xcc, 0xab, 0x85, 0x54, 0xce, + 0x22, 0xd7, 0xd0, 0xfc, 0xaa, 0x70, 0xdf, 0x5a, 0xfc, 0x25, 0x54, 0xb2, 0xb8, 0xcf, 0xef, 0x58, + 0x32, 0x3b, 0xde, 0xcc, 0xef, 0x58, 0xcd, 0xe3, 0x7e, 0x03, 0xf3, 0x27, 0xcf, 0xe9, 0x5d, 0xe8, + 0x62, 0x0e, 0xbd, 0x61, 0x57, 0x2c, 0x5d, 0xb1, 0x8a, 0x75, 0x7b, 0xc3, 0xae, 0xd8, 0xf5, 0xd2, + 0x86, 0x5d, 0x29, 0xd5, 0xcb, 0x1b, 0x76, 0xe5, 0x5a, 0x7d, 0xae, 0xf1, 0xaf, 0x02, 0x7c, 0xd4, + 0x97, 0x9c, 0xe0, 0x71, 0x2f, 0x1a, 0x12, 0xa1, 0x0a, 0xcf, 0xb4, 0x9e, 0xfc, 0x11, 0x6e, 0xc4, + 0x98, 0x4b, 0xaa, 0x26, 0x3d, 0x1c, 0x04, 0x2a, 0xe9, 0x89, 0x70, 0x2c, 0xed, 0xd3, 0x27, 0x2a, + 0x16, 0xfe, 0x7d, 0xb8, 0xbc, 0x7e, 0xa9, 0x58, 0xf0, 0xfd, 0x50, 0xf5, 0x5b, 0x82, 0xc7, 0x34, + 0x1a, 0xfa, 0x7e, 0xd8, 0xda, 0xca, 0x88, 0x1f, 0x1a, 0x5e, 0x17, 0xc5, 0xa7, 0x66, 0x88, 0x40, + 0x1d, 0x00, 0x21, 0x31, 0x97, 0x9e, 0x4a, 0x93, 0xf4, 0x24, 0x2e, 0x55, 0xd5, 0xab, 0x1a, 0xa6, + 0x66, 0x11, 0x87, 0x79, 0xb3, 0x71, 0x66, 0x81, 0xae, 0xa7, 0xd5, 0xce, 0x66, 0xaa, 0x7f, 0xf7, + 0xfb, 0xe9, 0x6f, 0x7c, 0x96, 0x29, 0x3f, 0x27, 0xf2, 0x3f, 0x1b, 0xff, 0x9c, 0x35, 0xad, 0x3c, + 0x89, 0xa7, 0xbe, 0xbc, 0x97, 0x75, 0x4f, 0x4b, 0x67, 0xe9, 0x47, 0x67, 0x54, 0xb8, 0xb7, 0x7b, + 0xe5, 0x17, 0x50, 0xa7, 0x91, 0xe4, 0x2c, 0x48, 0x7c, 0x12, 0x78, 0x06, 0x5f, 0xb8, 0x0c, 0xfe, + 0xfa, 0x31, 0xac, 0xaf, 0x99, 0xee, 0x41, 0x2d, 0x20, 0xbb, 0x38, 0x09, 0xa5, 0xa7, 0x4a, 0xb6, + 0x71, 0x01, 0x4a, 0x1b, 0x14, 0xac, 0x99, 0xa5, 0x6d, 0xb7, 0xe7, 0x42, 0x2a, 0xb6, 0xcd, 0x29, + 0xfa, 0xb3, 0x05, 0x37, 0x12, 0x4e, 0x85, 0x37, 0x98, 0x78, 0x21, 0xf3, 0x71, 0x48, 0xe5, 0xc4, + 0x1b, 0x1d, 0x38, 0xb6, 0x56, 0xe1, 0xf3, 0x8b, 0xaf, 0x23, 0xc7, 0xb6, 0xab, 0x62, 0x2f, 0x3a, + 0x93, 0x2f, 0x53, 0x86, 0xcd, 0x03, 0x53, 0x6b, 0x6f, 0x1e, 0x1d, 0x2e, 0xd7, 0xb7, 0xdd, 0x5e, + 0x7e, 0x69, 0xc7, 0xad, 0x27, 0xa7, 0x84, 0x91, 0x0b, 0xb5, 0xf1, 0x81, 0xef, 0x7b, 0xbb, 0x34, + 0x94, 0x84, 0xeb, 0xf6, 0x3f, 0x7f, 0x22, 0x0a, 0x32, 0xfb, 0xbf, 0xda, 0xe9, 0x76, 0xd7, 0xb5, + 0xd0, 0xb1, 0x65, 0xc7, 0x73, 0x2e, 0x28, 0x16, 0x33, 0x46, 0x5f, 0x00, 0x90, 0xc8, 0xe7, 0x93, + 0x58, 0xb7, 0x60, 0xd3, 0x04, 0x9b, 0x67, 0x50, 0xaa, 0x96, 0xf3, 0x68, 0x2a, 0xf8, 0x54, 0xff, + 0x15, 0x6e, 0x0e, 0x8b, 0x9e, 0xc2, 0xc2, 0x40, 0x5b, 0xeb, 0xe5, 0x22, 0xf5, 0x0a, 0xf7, 0x8f, + 0xeb, 0x06, 0xdd, 0x9f, 0xc6, 0xeb, 0x26, 0xa4, 0x53, 0x1e, 0x89, 0x02, 0x43, 0x57, 0xb9, 0x3c, + 0xdd, 0x9c, 0xc1, 0x3e, 0x8a, 0x02, 0x4d, 0xb6, 0x0d, 0xe5, 0x78, 0xe4, 0xd1, 0x20, 0xeb, 0x8c, + 0xf7, 0x2e, 0x7d, 0x66, 0x5b, 0xa3, 0x5e, 0x90, 0x36, 0xc5, 0xaa, 0xba, 0xc3, 0x6c, 0x6d, 0xf6, + 0xd6, 0x84, 0x5b, 0x8a, 0xd5, 0xf4, 0xa9, 0xde, 0x00, 0x3f, 0x54, 0x6f, 0xe8, 0xc2, 0x07, 0x67, + 0x86, 0xce, 0x19, 0xe5, 0xfa, 0xfc, 0xe2, 0x79, 0x1f, 0xe0, 0xd8, 0x96, 0x3c, 0xd2, 0x3e, 0x03, + 0x59, 0xc9, 0x21, 0x1b, 0xff, 0xb0, 0xa0, 0xee, 0x12, 0x21, 0x19, 0x27, 0x2a, 0x89, 0x0c, 0xc1, + 0xa7, 0x60, 0xab, 0x3c, 0x4c, 0x1b, 0xd4, 0x3b, 0xd2, 0x50, 0x8b, 0xa2, 0x87, 0x50, 0xda, 0xa5, + 0xea, 0x76, 0x62, 0x52, 0x77, 0xf5, 0xac, 0xcb, 0x8d, 0x6e, 0x18, 0x2e, 0xd9, 0x4f, 0x88, 0x90, + 0x3a, 0xea, 0xb2, 0x42, 0xa0, 0x91, 0xe8, 0x16, 0xd4, 0xb2, 0x5b, 0x57, 0x2f, 0x78, 0xa9, 0xd3, + 0x37, 0xbb, 0x46, 0xe4, 0x17, 0x1a, 0x7f, 0x2a, 0xc2, 0xf5, 0x54, 0xe5, 0x69, 0xe5, 0x59, 0x87, + 0x6b, 0xdc, 0x4c, 0x99, 0x68, 0xb2, 0x2e, 0x1f, 0x4d, 0xb5, 0x14, 0xa8, 0x63, 0xe9, 0x64, 0xce, + 0x14, 0xde, 0x23, 0x67, 0x7a, 0x50, 0xe6, 0x44, 0x5f, 0xa2, 0xcc, 0x53, 0xe2, 0x67, 0xef, 0xf4, + 0x48, 0xfa, 0xa2, 0x18, 0x91, 0x49, 0x76, 0xf5, 0x33, 0x04, 0xea, 0xea, 0x97, 0x06, 0xb8, 0x29, + 0x4a, 0x3f, 0xbf, 0xa8, 0x4f, 0x9f, 0xf0, 0xcb, 0x85, 0x11, 0xfe, 0x1e, 0x51, 0xf3, 0xb7, 0x02, + 0xdc, 0xe8, 0xc7, 0x21, 0x95, 0x0f, 0xa3, 0xa0, 0xef, 0x63, 0x29, 0xd3, 0xb7, 0xdc, 0x1f, 0xa0, + 0xac, 0x1f, 0x2b, 0x59, 0x07, 0x78, 0x70, 0xbe, 0xa6, 0x67, 0xc0, 0x33, 0xed, 0xb5, 0x3e, 0x5d, + 0xc5, 0x93, 0x39, 0xc2, 0x90, 0xe6, 0x7c, 0x5a, 0x78, 0x4f, 0x9f, 0x2e, 0x7a, 0xb0, 0xf0, 0xd6, + 0x6e, 0x68, 0x03, 0x66, 0x89, 0x7a, 0x9b, 0x90, 0x4c, 0xff, 0xdb, 0xef, 0xf4, 0xf4, 0x34, 0x69, + 0x52, 0xfe, 0x8c, 0xa0, 0xf1, 0x97, 0x22, 0xcc, 0x75, 0xfb, 0x3b, 0x5f, 0x73, 0x9a, 0x39, 0xe7, + 0x96, 0x6a, 0x4f, 0x42, 0xd2, 0xc8, 0xbc, 0x0b, 0x75, 0x62, 0x67, 0x31, 0x98, 0x5b, 0x40, 0x3f, + 0x81, 0x6b, 0xaa, 0x52, 0x78, 0xb1, 0x76, 0x8c, 0x89, 0xc2, 0xa9, 0xa0, 0xae, 0x21, 0x66, 0x01, + 0x7d, 0x06, 0xb3, 0xcc, 0x44, 0x9e, 0x4e, 0x96, 0xda, 0x99, 0x0d, 0xa3, 0xdb, 0xdf, 0x49, 0xc3, + 0x33, 0xd3, 0x30, 0xc5, 0x1c, 0xbf, 0x38, 0x39, 0x7b, 0x21, 0xd2, 0x67, 0x6a, 0xfe, 0xc5, 0xe9, + 0xb2, 0x17, 0x02, 0xfd, 0x1e, 0x16, 0x7c, 0x36, 0x8e, 0x55, 0xee, 0xa9, 0x0b, 0x92, 0xcf, 0x02, + 0xe2, 0xa7, 0xed, 0xe9, 0x82, 0xc7, 0xb1, 0x4a, 0x8f, 0xee, 0x31, 0x2c, 0xa5, 0xad, 0xe7, 0x98, + 0xba, 0x8a, 0xe8, 0x54, 0x8d, 0x2d, 0xff, 0x40, 0x35, 0xb6, 0xf1, 0x35, 0x2c, 0x74, 0x92, 0x50, + 0x19, 0x94, 0x3b, 0x8e, 0xe9, 0xe7, 0x02, 0xeb, 0x7b, 0x7f, 0x2e, 0xb8, 0xbd, 0x0a, 0xd7, 0x4f, + 0x99, 0x8a, 0x2a, 0x60, 0x3f, 0x61, 0x11, 0xa9, 0xcf, 0xa8, 0xd1, 0xe3, 0x57, 0x34, 0xae, 0x5b, + 0x9d, 0x3b, 0xaf, 0xff, 0xbb, 0x34, 0xf3, 0xfa, 0x68, 0xc9, 0xfa, 0xf6, 0x68, 0xc9, 0xfa, 0xee, + 0x68, 0xc9, 0xfa, 0xcf, 0xd1, 0x92, 0xf5, 0xd7, 0x37, 0x4b, 0x33, 0xdf, 0xbe, 0x59, 0x9a, 0xf9, + 0xee, 0xcd, 0xd2, 0xcc, 0x37, 0xb5, 0xdc, 0x17, 0x99, 0xff, 0x07, 0x00, 0x00, 0xff, 0xff, 0xc0, + 0xb0, 0xb4, 0x69, 0x3e, 0x12, 0x00, 0x00, } diff --git a/pkg/sql/execinfrapb/processors_bulk_io.proto b/pkg/sql/execinfrapb/processors_bulk_io.proto index 7008638e3645..6027e9a4472d 100644 --- a/pkg/sql/execinfrapb/processors_bulk_io.proto +++ b/pkg/sql/execinfrapb/processors_bulk_io.proto @@ -140,6 +140,8 @@ message StreamIngestionDataSpec { repeated string partition_addresses = 1 [(gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl.PartitionAddress",(gogoproto.nullable) = false]; // The processor will ingest events from StartTime onwards. optional util.hlc.Timestamp start_time = 2 [(gogoproto.nullable) = false]; + // StreamAddress locate the stream so that a stream client can be initialized. + optional string stream_address = 3 [(gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl.StreamAddress",(gogoproto.nullable) = false]; } message BackupDataSpec {