Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
58617: sql: add qualification prefix for user-defined schema names r=the-ericwang35 a=the-ericwang35

Fixes #57738.

Previously, event logs were not capturing the qualified schema names
for create_schema, drop_schema, rename_schema and alter_schema_owner events.
This PR changes the event logs to use the qualified schema names.
Tests were also updated to reflect these changes.

Release note (bug fix): add qualification prefix for user-defined schema names.

59139: streamclient: add random stream client r=pbardea a=pbardea

See individual commits, but this PR does some cleanup while introducing a
stream client implementation that randomly generates rows.

I broke down each change into its own commit for reviews, but let me know
if splitting this into separate PRs would be helpful.

59276: opt: add opttester command to check rule applications and memo size r=rytaft a=DrewKimball

Previously, it was difficult to test that the number of rule
applications and/or memo groups remains reasonable during
optimization of a query.

This patch adds the `check-size` command, which outputs the number
of rules applied and memo groups created during optimization. 
The `rule-limit` and `group-limit` flags can be used to throw an 
error if the number of rule applications or memo groups exceed
the given limit.

Release note: None

Fixes #59192

Co-authored-by: Eric Wang <[email protected]>
Co-authored-by: Paul Bardea <[email protected]>
Co-authored-by: Andrew Kimball <[email protected]>
  • Loading branch information
4 people committed Jan 25, 2021
4 parents 8efa63d + a9e25d6 + be6163f + bab2c61 commit b51d9f1
Show file tree
Hide file tree
Showing 21 changed files with 1,005 additions and 585 deletions.
7 changes: 7 additions & 0 deletions pkg/ccl/streamingccl/addresses.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,17 @@

package streamingccl

import "net/url"

// StreamAddress is the location of the stream. The topology of a stream should
// be resolvable given a stream address.
type StreamAddress string

// URL parses the stream address as a URL.
func (sa StreamAddress) URL() (*url.URL, error) {
return url.Parse(string(sa))
}

// PartitionAddress is the address where the stream client should be able to
// read the events produced by a partition of a stream.
//
Expand Down
16 changes: 15 additions & 1 deletion pkg/ccl/streamingccl/streamclient/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,25 @@ go_library(
name = "streamclient",
srcs = [
"client.go",
"random_stream_client.go",
"stream_client.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient",
visibility = ["//visibility:public"],
deps = ["//pkg/ccl/streamingccl"],
deps = [
"//pkg/ccl/streamingccl",
"//pkg/keys",
"//pkg/roachpb",
"//pkg/sql",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/systemschema",
"//pkg/sql/catalog/tabledesc",
"//pkg/sql/rowenc",
"//pkg/sql/sem/tree",
"//pkg/util/hlc",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
],
)

go_test(
Expand Down
30 changes: 29 additions & 1 deletion pkg/ccl/streamingccl/streamclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package streamclient

import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl"
Expand All @@ -24,5 +25,32 @@ type Client interface {

// ConsumePartition returns a channel on which we can start listening for
// events from a given partition that occur after a startTime.
ConsumePartition(address streamingccl.PartitionAddress, startTime time.Time) (chan streamingccl.Event, error)
//
// Canceling the context will stop reading the partition and close the event
// channel.
// TODO: Add an error channel so that the client can report any errors
// encountered while reading the stream.
ConsumePartition(ctx context.Context, address streamingccl.PartitionAddress, startTime time.Time) (chan streamingccl.Event, error)
}

// NewStreamClient creates a new stream client based on the stream
// address.
func NewStreamClient(streamAddress streamingccl.StreamAddress) (Client, error) {
var streamClient Client
streamURL, err := streamAddress.URL()
if err != nil {
return streamClient, err
}

switch streamURL.Scheme {
case TestScheme:
streamClient, err = newRandomStreamClient(streamURL)
if err != nil {
return streamClient, err
}
default:
streamClient = &mockClient{}
}

return streamClient, nil
}
76 changes: 57 additions & 19 deletions pkg/ccl/streamingccl/streamclient/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
package streamclient

import (
"context"
"fmt"
"net/url"
"testing"
"time"

Expand All @@ -35,51 +38,86 @@ func (sc testStreamClient) GetTopology(

// ConsumePartition implements the Client interface.
func (sc testStreamClient) ConsumePartition(
_ streamingccl.PartitionAddress, _ time.Time,
_ context.Context, _ streamingccl.PartitionAddress, _ time.Time,
) (chan streamingccl.Event, error) {
sampleKV := roachpb.KeyValue{
Key: []byte("key_1"),
Value: roachpb.Value{
RawBytes: []byte("value 1"),
RawBytes: []byte("value_1"),
Timestamp: hlc.Timestamp{WallTime: 1},
},
}

events := make(chan streamingccl.Event, 100)
events := make(chan streamingccl.Event, 2)
events <- streamingccl.MakeKVEvent(sampleKV)
events <- streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: timeutil.Now().UnixNano()})
events <- streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 100})
close(events)

return events, nil
}

// TestExampleClientUsage serves as documentation to indicate how a stream
// ExampleClientUsage serves as documentation to indicate how a stream
// client could be used.
func TestExampleClientUsage(t *testing.T) {
func ExampleClient() {
client := testStreamClient{}
sa := streamingccl.StreamAddress("s3://my_bucket/my_stream")
topology, err := client.GetTopology(sa)
require.NoError(t, err)
topology, err := client.GetTopology("s3://my_bucket/my_stream")
if err != nil {
panic(err)
}

startTimestamp := timeutil.Now()
numReceivedEvents := 0

for _, partition := range topology.Partitions {
eventCh, err := client.ConsumePartition(partition, startTimestamp)
require.NoError(t, err)
eventCh, err := client.ConsumePartition(context.Background(), partition, startTimestamp)
if err != nil {
panic(err)
}

// This example looks for the closing of the channel to terminate the test,
// but an ingestion job should look for another event such as the user
// cutting over to the new cluster to move to the next stage.
for {
_, ok := <-eventCh
if !ok {
break
for event := range eventCh {
switch event.Type() {
case streamingccl.KVEvent:
kv := event.GetKV()
fmt.Printf("%s->%s@%d\n", kv.Key.String(), string(kv.Value.RawBytes), kv.Value.Timestamp.WallTime)
case streamingccl.CheckpointEvent:
fmt.Printf("resolved %d\n", event.GetResolved().WallTime)
default:
panic(fmt.Sprintf("unexpected event type %v", event.Type()))
}
numReceivedEvents++
}
}

// We expect 4 events, 2 from each partition.
require.Equal(t, 4, numReceivedEvents)
// Output:
// "key_1"->value_1@1
// resolved 100
// "key_1"->value_1@1
// resolved 100
}

// Ensure that all implementations specified in this test properly close the
// eventChannel when the given context is canceled.
func TestImplementationsCloseChannel(t *testing.T) {
streamURL, err := url.Parse("test://52")
require.NoError(t, err)
randomClient, err := newRandomStreamClient(streamURL)
require.NoError(t, err)

// TODO: Add SQL client and file client here when implemented.
impls := []Client{
&mockClient{},
randomClient,
}

for _, impl := range impls {
ctx, cancel := context.WithCancel(context.Background())
eventCh, err := impl.ConsumePartition(ctx, "test://53/", timeutil.Now())
require.NoError(t, err)

// Ensure that the eventCh closes when the context is canceled.
cancel()
for range eventCh {
}
}
}
Loading

0 comments on commit b51d9f1

Please sign in to comment.