From ff40d73e7dbb5199df480d2bf1041176eb0c9c28 Mon Sep 17 00:00:00 2001 From: Anne Zhu Date: Mon, 28 Jun 2021 00:46:57 -0400 Subject: [PATCH] streamingccl: add GenerationEvent type Add `GenerationEvent` as a possible event type to be emitted over a cluster stream. When a `GenerationEvent` is emitted, we should be able to get its topology as well as the start time of the new generation. Release note: None --- pkg/ccl/streamingccl/event.go | 28 +++++++++++++++++++ .../streamingccl/streamclient/client_test.go | 2 ++ 2 files changed, 30 insertions(+) diff --git a/pkg/ccl/streamingccl/event.go b/pkg/ccl/streamingccl/event.go index b326d61d1fd5..77f4800f5399 100644 --- a/pkg/ccl/streamingccl/event.go +++ b/pkg/ccl/streamingccl/event.go @@ -23,6 +23,9 @@ const ( // CheckpointEvent indicates that GetResolved will be meaningful. The resolved // timestamp indicates that all KVs have been emitted up to this timestamp. CheckpointEvent + // GenerationEvent indicates that the stream should start ingesting with the + // updated topology. + GenerationEvent ) // Event describes an event emitted by a cluster to cluster stream. Its Type @@ -84,6 +87,26 @@ func (ce checkpointEvent) GetResolved() *hlc.Timestamp { return &ce.resolvedTimestamp } +// generationEvent indicates that the topology of the stream has changed. +type generationEvent struct{} + +var _ Event = generationEvent{} + +// Type implements the Event interface. +func (ge generationEvent) Type() EventType { + return GenerationEvent +} + +// GetKV implements the Event interface. +func (ge generationEvent) GetKV() *roachpb.KeyValue { + return nil +} + +// GetResolved implements the Event interface. +func (ge generationEvent) GetResolved() *hlc.Timestamp { + return nil +} + // MakeKVEvent creates an Event from a KV. func MakeKVEvent(kv roachpb.KeyValue) Event { return kvEvent{kv: kv} @@ -93,3 +116,8 @@ func MakeKVEvent(kv roachpb.KeyValue) Event { func MakeCheckpointEvent(resolvedTimestamp hlc.Timestamp) Event { return checkpointEvent{resolvedTimestamp: resolvedTimestamp} } + +// MakeGenerationEvent creates an GenerationEvent. +func MakeGenerationEvent() Event { + return generationEvent{} +} diff --git a/pkg/ccl/streamingccl/streamclient/client_test.go b/pkg/ccl/streamingccl/streamclient/client_test.go index 0121d3281e6b..3dd2b63784cc 100644 --- a/pkg/ccl/streamingccl/streamclient/client_test.go +++ b/pkg/ccl/streamingccl/streamclient/client_test.go @@ -79,6 +79,8 @@ func ExampleClient() { fmt.Printf("%s->%s@%d\n", kv.Key.String(), string(kv.Value.RawBytes), kv.Value.Timestamp.WallTime) case streamingccl.CheckpointEvent: fmt.Printf("resolved %d\n", event.GetResolved().WallTime) + case streamingccl.GenerationEvent: + fmt.Printf("received generation event") default: panic(fmt.Sprintf("unexpected event type %v", event.Type())) }