From 60bf11ad87258b61ab75c24f84ec5793b23e5e09 Mon Sep 17 00:00:00 2001 From: Paul Bardea Date: Sun, 20 Dec 2020 19:42:25 -0500 Subject: [PATCH] streamingccl: change Event to an interface Release note: None --- pkg/ccl/streamingccl/event.go | 93 +++++++++++++------ .../streamclient/stream_client_test.go | 2 +- 2 files changed, 64 insertions(+), 31 deletions(-) diff --git a/pkg/ccl/streamingccl/event.go b/pkg/ccl/streamingccl/event.go index 6cc2880401ef..3f78259bdb91 100644 --- a/pkg/ccl/streamingccl/event.go +++ b/pkg/ccl/streamingccl/event.go @@ -14,10 +14,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" ) -// YYY: This is nearly identical to the KV feed events we emit. These events -// will most likely eventually be protobufs to serialize the entire event over -// the stream connection. - // EventType enumerates all possible events emitted over a cluster stream. type EventType int @@ -25,36 +21,73 @@ const ( // KVEvent indicates that the KV field of an event holds an updated KV which // needs to be ingested. KVEvent EventType = iota - // ResolvedEvent indicates that the Resolved field of an event holds a - // timestamp indicating that all events for this partition up to this - // timestamp have been emitted. - ResolvedEvent + // CheckpointEvent indicates that GetResolved will be meaningful. The resolved + // timestamp indicates that all KVs have been emitted up to this timestamp. + CheckpointEvent ) -// Event describes an event emitted by a cluster to cluster stream. -// Its EventType field indicates which other fields are meaningful. -type Event struct { - // EventType indicates which fields on the event will be meaningful. - EventType EventType +// Event describes an event emitted by a cluster to cluster stream. Its Type +// field indicates which other fields are meaningful. +type Event interface { + // Type specifies which accessor will be meaningful. + Type() EventType + + // GetKV returns a KV event if the EventType is KVEvent. + GetKV() *roachpb.KeyValue + // GetResolved returns a resolved timestamp if the EventType is ResolvedEvent. + // The resolved timestamp indicates that all KV events until this time have + // been emitted. + GetResolved() *time.Time +} + +type kvEvent struct { + kv roachpb.KeyValue +} + +var _ Event = kvEvent{} - // KV is populated when eventType is KVEvent. - KV roachpb.KeyValue - // Resolved is populated when eventType is ResolvedEvent. - Resolved time.Time +// Type implements the Event interface. +func (kve kvEvent) Type() EventType { + return KVEvent } -// MakeKVEvent creates a KV typed Event. +// GetKV implements the Event interface. +func (kve kvEvent) GetKV() *roachpb.KeyValue { + return &kve.kv +} + +// GetResolved implements the Event interface. +func (kve kvEvent) GetResolved() *time.Time { + return nil +} + +type checkpointEvent struct { + resolvedTimestamp time.Time +} + +var _ Event = checkpointEvent{} + +// Type implements the Event interface. +func (ce checkpointEvent) Type() EventType { + return CheckpointEvent +} + +// GetKV implements the Event interface. +func (ce checkpointEvent) GetKV() *roachpb.KeyValue { + return nil +} + +// GetResolved implements the Event interface. +func (ce checkpointEvent) GetResolved() *time.Time { + return &ce.resolvedTimestamp +} + +// MakeKVEvent creates an Event from a KV. func MakeKVEvent(kv roachpb.KeyValue) Event { - return Event{ - EventType: KVEvent, - KV: kv, - } -} - -// MakeResolvedEvent makes a resolved timestamp Event. -func MakeResolvedEvent(resolvedTimestamp time.Time) Event { - return Event{ - EventType: ResolvedEvent, - Resolved: resolvedTimestamp, - } + return kvEvent{kv: kv} +} + +// MakeCheckpointEvent creates an Event from a resolved timestamp. +func MakeCheckpointEvent(resolvedTimestamp time.Time) Event { + return checkpointEvent{resolvedTimestamp: resolvedTimestamp} } diff --git a/pkg/ccl/streamingccl/streamclient/stream_client_test.go b/pkg/ccl/streamingccl/streamclient/stream_client_test.go index 47a56c76072a..5e1753666124 100644 --- a/pkg/ccl/streamingccl/streamclient/stream_client_test.go +++ b/pkg/ccl/streamingccl/streamclient/stream_client_test.go @@ -45,7 +45,7 @@ func (sc mockStreamClient) ConsumePartition( events := make(chan streamingccl.Event, 100) events <- streamingccl.MakeKVEvent(sampleKV) - events <- streamingccl.MakeResolvedEvent(timeutil.Now()) + events <- streamingccl.MakeCheckpointEvent(timeutil.Now()) close(events) return events, nil