Skip to content

Commit

Permalink
streamclient: close event channels on context cancellation
Browse files Browse the repository at this point in the history
Stream clients now take in a context when opening an event stream for a
given partition. To close the event stream returned by the client, the
given context should be cancelled.

Release note: None
  • Loading branch information
pbardea committed Jan 22, 2021
1 parent 4d419c8 commit be9e8cc
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 15 deletions.
8 changes: 7 additions & 1 deletion pkg/ccl/streamingccl/streamclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package streamclient

import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl"
Expand All @@ -24,5 +25,10 @@ type Client interface {

// ConsumePartition returns a channel on which we can start listening for
// events from a given partition that occur after a startTime.
ConsumePartition(address streamingccl.PartitionAddress, startTime time.Time) (chan streamingccl.Event, error)
//
// Canceling the context will stop reading the partition and close the event
// channel.
// TODO: Add an error channel so that the client can report any errors
// encountered while reading the stream.
ConsumePartition(ctx context.Context, address streamingccl.PartitionAddress, startTime time.Time) (chan streamingccl.Event, error)
}
26 changes: 24 additions & 2 deletions pkg/ccl/streamingccl/streamclient/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package streamclient

import (
"context"
"testing"
"time"

Expand All @@ -35,7 +36,7 @@ func (sc testStreamClient) GetTopology(

// ConsumePartition implements the Client interface.
func (sc testStreamClient) ConsumePartition(
_ streamingccl.PartitionAddress, _ time.Time,
_ context.Context, _ streamingccl.PartitionAddress, _ time.Time,
) (chan streamingccl.Event, error) {
sampleKV := roachpb.KeyValue{
Key: []byte("key_1"),
Expand All @@ -56,6 +57,7 @@ func (sc testStreamClient) ConsumePartition(
// TestExampleClientUsage serves as documentation to indicate how a stream
// client could be used.
func TestExampleClientUsage(t *testing.T) {
ctx := context.Background()
client := testStreamClient{}
sa := streamingccl.StreamAddress("s3://my_bucket/my_stream")
topology, err := client.GetTopology(sa)
Expand All @@ -65,7 +67,7 @@ func TestExampleClientUsage(t *testing.T) {
numReceivedEvents := 0

for _, partition := range topology.Partitions {
eventCh, err := client.ConsumePartition(partition, startTimestamp)
eventCh, err := client.ConsumePartition(ctx, partition, startTimestamp)
require.NoError(t, err)

// This example looks for the closing of the channel to terminate the test,
Expand All @@ -83,3 +85,23 @@ func TestExampleClientUsage(t *testing.T) {
// We expect 4 events, 2 from each partition.
require.Equal(t, 4, numReceivedEvents)
}

// Ensure that all implementations specified in this test properly close the
// eventChannel when the given context is canceled.
func TestImplementationsCloseChannel(t *testing.T) {
// TODO: Add SQL client and file client here when implemented.
impls := []Client{
&client{},
}

for _, impl := range impls {
ctx, cancel := context.WithCancel(context.Background())
eventCh, err := impl.ConsumePartition(ctx, "test://53/", timeutil.Now())
require.NoError(t, err)

// Ensure that the eventCh closes when the context is canceled.
cancel()
for range eventCh {
}
}
}
7 changes: 6 additions & 1 deletion pkg/ccl/streamingccl/streamclient/stream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package streamclient

import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl"
Expand All @@ -33,8 +34,12 @@ func (m *client) GetTopology(_ streamingccl.StreamAddress) (streamingccl.Topolog

// ConsumePartition implements the Client interface.
func (m *client) ConsumePartition(
_ streamingccl.PartitionAddress, _ time.Time,
ctx context.Context, _ streamingccl.PartitionAddress, _ time.Time,
) (chan streamingccl.Event, error) {
eventCh := make(chan streamingccl.Event)
go func() {
<-ctx.Done()
close(eventCh)
}()
return eventCh, nil
}
20 changes: 10 additions & 10 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (sip *streamIngestionProcessor) Start(ctx context.Context) context.Context
startTime := timeutil.Unix(0 /* sec */, sip.spec.StartTime.WallTime)
eventChs := make(map[streamingccl.PartitionAddress]chan streamingccl.Event)
for _, partitionAddress := range sip.spec.PartitionAddresses {
eventCh, err := sip.client.ConsumePartition(partitionAddress, startTime)
eventCh, err := sip.client.ConsumePartition(ctx, partitionAddress, startTime)
if err != nil {
sip.ingestionErr = errors.Wrapf(err, "consuming partition %v", partitionAddress)
}
Expand Down Expand Up @@ -216,17 +216,17 @@ func merge(
for partition, eventCh := range partitionStreams {
go func(partition streamingccl.PartitionAddress, eventCh <-chan streamingccl.Event) {
defer wg.Done()
for {
for event := range eventCh {
pe := partitionEvent{
Event: event,
partition: partition,
}

select {
case event, ok := <-eventCh:
if !ok {
return
}
merged <- partitionEvent{
Event: event,
partition: partition,
}
case merged <- pe:
case <-ctx.Done():
// TODO: Add ctx.Err() to an error channel once ConsumePartition
// supports an error ch.
return
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (m *mockStreamClient) GetTopology(

// ConsumePartition implements the StreamClient interface.
func (m *mockStreamClient) ConsumePartition(
_ streamingccl.PartitionAddress, _ time.Time,
_ context.Context, _ streamingccl.PartitionAddress, _ time.Time,
) (chan streamingccl.Event, error) {
eventCh := make(chan streamingccl.Event, len(m.partitionEvents))

Expand Down

0 comments on commit be9e8cc

Please sign in to comment.