Skip to content

Commit

Permalink
streamclient: add random stream client
Browse files Browse the repository at this point in the history
This commit introduces a new stream client implementation that generates
events of a specific schema for a table ID that is specified by the
stream URI. Properties of the stream, such as the frequency of the
events and the range of the randomly generated KVs can be controlled
with the appropriate parameters specified in the stream address.

To use the new stream client the `NewStreamClient` constructor has been
modified to accept a stream address. The stream address allows the
client to determine which client implementation should be used.

Further, the addition of this client exposed a bug in the SST batcher
which rejects batches that modify the same key more than once, even if
disallowShadowing is set to false.

Release note: None
  • Loading branch information
pbardea committed Jan 22, 2021
1 parent be9e8cc commit baf8928
Show file tree
Hide file tree
Showing 14 changed files with 689 additions and 188 deletions.
7 changes: 7 additions & 0 deletions pkg/ccl/streamingccl/addresses.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,17 @@

package streamingccl

import "net/url"

// StreamAddress is the location of the stream. The topology of a stream should
// be resolvable given a stream address.
type StreamAddress string

// URL parses the stream address as a URL.
func (sa StreamAddress) URL() (*url.URL, error) {
return url.Parse(string(sa))
}

// PartitionAddress is the address where the stream client should be able to
// read the events produced by a partition of a stream.
//
Expand Down
16 changes: 15 additions & 1 deletion pkg/ccl/streamingccl/streamclient/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,25 @@ go_library(
name = "streamclient",
srcs = [
"client.go",
"random_stream_client.go",
"stream_client.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient",
visibility = ["//visibility:public"],
deps = ["//pkg/ccl/streamingccl"],
deps = [
"//pkg/ccl/streamingccl",
"//pkg/keys",
"//pkg/roachpb",
"//pkg/sql",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/systemschema",
"//pkg/sql/catalog/tabledesc",
"//pkg/sql/rowenc",
"//pkg/sql/sem/tree",
"//pkg/util/hlc",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
],
)

go_test(
Expand Down
22 changes: 22 additions & 0 deletions pkg/ccl/streamingccl/streamclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,25 @@ type Client interface {
// encountered while reading the stream.
ConsumePartition(ctx context.Context, address streamingccl.PartitionAddress, startTime time.Time) (chan streamingccl.Event, error)
}

// NewStreamClient creates a new stream client based on the stream
// address.
func NewStreamClient(streamAddress streamingccl.StreamAddress) (Client, error) {
var streamClient Client
streamURL, err := streamAddress.URL()
if err != nil {
return streamClient, err
}

switch streamURL.Scheme {
case TestScheme:
streamClient, err = newRandomStreamClient(streamURL)
if err != nil {
return streamClient, err
}
default:
streamClient = &client{}
}

return streamClient, nil
}
7 changes: 7 additions & 0 deletions pkg/ccl/streamingccl/streamclient/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package streamclient

import (
"context"
"net/url"
"testing"
"time"

Expand Down Expand Up @@ -89,9 +90,15 @@ func TestExampleClientUsage(t *testing.T) {
// Ensure that all implementations specified in this test properly close the
// eventChannel when the given context is canceled.
func TestImplementationsCloseChannel(t *testing.T) {
streamURL, err := url.Parse("test://52")
require.NoError(t, err)
randomClient, err := newRandomStreamClient(streamURL)
require.NoError(t, err)

// TODO: Add SQL client and file client here when implemented.
impls := []Client{
&client{},
randomClient,
}

for _, impl := range impls {
Expand Down
256 changes: 256 additions & 0 deletions pkg/ccl/streamingccl/streamclient/random_stream_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
// Copyright 2020 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package streamclient

import (
"context"
"math/rand"
"net/url"
"strconv"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)

const (
// RandomStreamSchema is the schema of the KVs emitted by the random stream
// client.
RandomStreamSchema = "CREATE TABLE test (k INT PRIMARY KEY, v INT)"

// TestScheme is the URI scheme used to create a test load.
TestScheme = "test"
// ValueRangeKey controls the range of the randomly generated values produced
// by this workload. The workload will generate between 0 and this value.
ValueRangeKey = "VALUE_RANGE"
// KVFrequency is the frequency in nanoseconds that the stream will emit
// randomly generated KV events.
KVFrequency = "KV_FREQUENCY"
// KVsPerCheckpoint controls approximately how many KV events should be emitted
// between checkpoint events.
KVsPerCheckpoint = "KVS_PER_CHECKPOINT"
)

// randomStreamConfig specifies the variables that controls the rate and type of
// events that the generated stream emits.
type randomStreamConfig struct {
valueRange int
kvFrequency time.Duration
kvsPerCheckpoint int
}

func parseRandomStreamConfig(streamURL *url.URL) (randomStreamConfig, error) {
c := randomStreamConfig{
valueRange: 100,
kvFrequency: 10 * time.Microsecond,
kvsPerCheckpoint: 100,
}

var err error
if valueRangeStr := streamURL.Query().Get(ValueRangeKey); valueRangeStr != "" {
c.valueRange, err = strconv.Atoi(valueRangeStr)
if err != nil {
return c, err
}
}

if kvFreqStr := streamURL.Query().Get(KVFrequency); kvFreqStr != "" {
kvFreq, err := strconv.Atoi(kvFreqStr)
c.kvFrequency = time.Duration(kvFreq)
if err != nil {
return c, err
}
}

if kvsPerCheckpointStr := streamURL.Query().Get(KVsPerCheckpoint); kvsPerCheckpointStr != "" {
c.kvsPerCheckpoint, err = strconv.Atoi(kvsPerCheckpointStr)
if err != nil {
return c, err
}
}

return c, nil
}

// randomStreamClient is a temporary stream client implementation that generates
// random events.
//
// It expects a table with the schema `RandomStreamSchema` to already exist,
// with table ID `<table_id>` to be used in the URI. Opening the stream client
// on the URI 'test://<table_id>' will generate random events into this table.
//
// TODO: Move this over to a _test file in the ingestion package when there is a
// real stream client implementation.
type randomStreamClient struct {
baseDesc *tabledesc.Mutable
config randomStreamConfig

// interceptors can be registered to peek at every event generated by this
// client.
mu struct {
syncutil.Mutex

interceptors []func(streamingccl.Event)
}
}

var _ Client = &randomStreamClient{}

// newRandomStreamClient returns a stream client that generates a random set of
// events on a table with an integer key and integer value for the table with
// the given ID.
func newRandomStreamClient(streamURL *url.URL) (Client, error) {
tableID, err := strconv.Atoi(streamURL.Host)
if err != nil {
return nil, err
}
testTable, err := sql.CreateTestTableDescriptor(
context.Background(),
50, /* defaultdb */
descpb.ID(tableID),
RandomStreamSchema,
systemschema.JobsTable.Privileges,
)
if err != nil {
return nil, err
}

streamConfig, err := parseRandomStreamConfig(streamURL)
if err != nil {
return nil, err
}

return &randomStreamClient{
baseDesc: testTable,
config: streamConfig,
}, nil
}

// GetTopology implements the Client interface.
func (m *randomStreamClient) GetTopology(
_ streamingccl.StreamAddress,
) (streamingccl.Topology, error) {
panic("not yet implemented")
}

// ConsumePartition implements the Client interface.
func (m *randomStreamClient) ConsumePartition(
ctx context.Context, _ streamingccl.PartitionAddress, startTime time.Time,
) (chan streamingccl.Event, error) {
eventCh := make(chan streamingccl.Event)
now := timeutil.Now()
if startTime.After(now) {
panic("cannot start random stream client event stream in the future")
}
lastResolvedTime := startTime

go func() {
defer close(eventCh)

// rand is not thread safe, so create a random source for each partition.
r := rand.New(rand.NewSource(timeutil.Now().UnixNano()))
kvInterval := m.config.kvFrequency
resolvedInterval := kvInterval * time.Duration(m.config.kvsPerCheckpoint)

kvTimer := timeutil.NewTimer()
kvTimer.Reset(0)
defer kvTimer.Stop()

resolvedTimer := timeutil.NewTimer()
resolvedTimer.Reset(0)
defer resolvedTimer.Stop()

for {
var event streamingccl.Event
select {
case <-kvTimer.C:
kvTimer.Read = true
event = streamingccl.MakeKVEvent(m.makeRandomKey(r, lastResolvedTime))
kvTimer.Reset(kvInterval)
case <-resolvedTimer.C:
resolvedTimer.Read = true
resolvedTime := timeutil.Now()
hlcResolvedTime := hlc.Timestamp{WallTime: resolvedTime.UnixNano()}
event = streamingccl.MakeCheckpointEvent(hlcResolvedTime)
lastResolvedTime = resolvedTime
resolvedTimer.Reset(resolvedInterval)
}

// TODO: Consider keeping an in-memory copy so that tests can verify
// that the data we've ingested is correct.
select {
case eventCh <- event:
case <-ctx.Done():
return
}

if len(m.mu.interceptors) > 0 {
m.mu.Lock()
for _, interceptor := range m.mu.interceptors {
if interceptor != nil {
interceptor(event)
}
}
m.mu.Unlock()
}
}
}()

return eventCh, nil
}

func (m *randomStreamClient) makeRandomKey(r *rand.Rand, minTs time.Time) roachpb.KeyValue {
tableDesc := m.baseDesc

// Create a key holding a random integer.
k, err := rowenc.TestingMakePrimaryIndexKey(tableDesc, r.Intn(m.config.valueRange))
if err != nil {
panic(err)
}
k = keys.MakeFamilyKey(k, uint32(tableDesc.Families[0].ID))

// Create a value holding a random integer.
valueDatum := tree.NewDInt(tree.DInt(r.Intn(m.config.valueRange)))
valueBuf, err := rowenc.EncodeTableValue(
[]byte(nil), tableDesc.Columns[1].ID, valueDatum, []byte(nil))
if err != nil {
panic(err)
}
var v roachpb.Value
v.SetTuple(valueBuf)
v.ClearChecksum()
v.InitChecksum(k)

// Generate a timestamp between minTs and now().
randOffset := int(timeutil.Now().UnixNano()) - int(minTs.UnixNano())
newTimestamp := rand.Intn(randOffset) + int(minTs.UnixNano())
v.Timestamp = hlc.Timestamp{WallTime: int64(newTimestamp)}

return roachpb.KeyValue{
Key: k,
Value: v,
}
}

// RegisterInterception implements streamingest.interceptableStreamClient.
func (m *randomStreamClient) RegisterInterception(f func(event streamingccl.Event)) {
m.mu.Lock()
defer m.mu.Unlock()
m.mu.interceptors = append(m.mu.interceptors, f)
}
5 changes: 0 additions & 5 deletions pkg/ccl/streamingccl/streamclient/stream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,6 @@ type client struct{}

var _ Client = &client{}

// NewStreamClient returns a new mock stream client.
func NewStreamClient() Client {
return &client{}
}

// GetTopology implements the Client interface.
func (m *client) GetTopology(_ streamingccl.StreamAddress) (streamingccl.Topology, error) {
return streamingccl.Topology{
Expand Down
3 changes: 3 additions & 0 deletions pkg/ccl/streamingccl/streamingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ go_test(
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/keys",
"//pkg/kv",
"//pkg/roachpb",
"//pkg/security",
"//pkg/security/securitytest",
Expand All @@ -60,12 +61,14 @@ go_test(
"//pkg/sql/execinfra",
"//pkg/sql/execinfrapb",
"//pkg/sql/sem/tree",
"//pkg/testutils",
"//pkg/testutils/distsqlutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/protoutil",
"//pkg/util/randutil",
"//pkg/util/timeutil",
Expand Down
Loading

0 comments on commit baf8928

Please sign in to comment.