Skip to content

Commit

Permalink
streamclient: change client example from Test to Example
Browse files Browse the repository at this point in the history
Release note: None
  • Loading branch information
pbardea committed Jan 22, 2021
1 parent 23dbf05 commit be6163f
Showing 1 changed file with 28 additions and 19 deletions.
47 changes: 28 additions & 19 deletions pkg/ccl/streamingccl/streamclient/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package streamclient

import (
"context"
"fmt"
"net/url"
"testing"
"time"
Expand Down Expand Up @@ -42,49 +43,57 @@ func (sc testStreamClient) ConsumePartition(
sampleKV := roachpb.KeyValue{
Key: []byte("key_1"),
Value: roachpb.Value{
RawBytes: []byte("value 1"),
RawBytes: []byte("value_1"),
Timestamp: hlc.Timestamp{WallTime: 1},
},
}

events := make(chan streamingccl.Event, 100)
events := make(chan streamingccl.Event, 2)
events <- streamingccl.MakeKVEvent(sampleKV)
events <- streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: timeutil.Now().UnixNano()})
events <- streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 100})
close(events)

return events, nil
}

// TestExampleClientUsage serves as documentation to indicate how a stream
// ExampleClientUsage serves as documentation to indicate how a stream
// client could be used.
func TestExampleClientUsage(t *testing.T) {
ctx := context.Background()
func ExampleClient() {
client := testStreamClient{}
sa := streamingccl.StreamAddress("s3://my_bucket/my_stream")
topology, err := client.GetTopology(sa)
require.NoError(t, err)
topology, err := client.GetTopology("s3://my_bucket/my_stream")
if err != nil {
panic(err)
}

startTimestamp := timeutil.Now()
numReceivedEvents := 0

for _, partition := range topology.Partitions {
eventCh, err := client.ConsumePartition(ctx, partition, startTimestamp)
require.NoError(t, err)
eventCh, err := client.ConsumePartition(context.Background(), partition, startTimestamp)
if err != nil {
panic(err)
}

// This example looks for the closing of the channel to terminate the test,
// but an ingestion job should look for another event such as the user
// cutting over to the new cluster to move to the next stage.
for {
_, ok := <-eventCh
if !ok {
break
for event := range eventCh {
switch event.Type() {
case streamingccl.KVEvent:
kv := event.GetKV()
fmt.Printf("%s->%s@%d\n", kv.Key.String(), string(kv.Value.RawBytes), kv.Value.Timestamp.WallTime)
case streamingccl.CheckpointEvent:
fmt.Printf("resolved %d\n", event.GetResolved().WallTime)
default:
panic(fmt.Sprintf("unexpected event type %v", event.Type()))
}
numReceivedEvents++
}
}

// We expect 4 events, 2 from each partition.
require.Equal(t, 4, numReceivedEvents)
// Output:
// "key_1"->value_1@1
// resolved 100
// "key_1"->value_1@1
// resolved 100
}

// Ensure that all implementations specified in this test properly close the
Expand Down

0 comments on commit be6163f

Please sign in to comment.