Skip to content

Commit

Permalink
streamingccl: change Event to an interface
Browse files Browse the repository at this point in the history
Release note: None
  • Loading branch information
pbardea authored and adityamaru committed Dec 30, 2020
1 parent 57dc7de commit 60bf11a
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 31 deletions.
93 changes: 63 additions & 30 deletions pkg/ccl/streamingccl/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,47 +14,80 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
)

// YYY: This is nearly identical to the KV feed events we emit. These events
// will most likely eventually be protobufs to serialize the entire event over
// the stream connection.

// EventType enumerates all possible events emitted over a cluster stream.
type EventType int

const (
// KVEvent indicates that the KV field of an event holds an updated KV which
// needs to be ingested.
KVEvent EventType = iota
// ResolvedEvent indicates that the Resolved field of an event holds a
// timestamp indicating that all events for this partition up to this
// timestamp have been emitted.
ResolvedEvent
// CheckpointEvent indicates that GetResolved will be meaningful. The resolved
// timestamp indicates that all KVs have been emitted up to this timestamp.
CheckpointEvent
)

// Event describes an event emitted by a cluster to cluster stream.
// Its EventType field indicates which other fields are meaningful.
type Event struct {
// EventType indicates which fields on the event will be meaningful.
EventType EventType
// Event describes an event emitted by a cluster to cluster stream. Its Type
// field indicates which other fields are meaningful.
type Event interface {
// Type specifies which accessor will be meaningful.
Type() EventType

// GetKV returns a KV event if the EventType is KVEvent.
GetKV() *roachpb.KeyValue
// GetResolved returns a resolved timestamp if the EventType is ResolvedEvent.
// The resolved timestamp indicates that all KV events until this time have
// been emitted.
GetResolved() *time.Time
}

type kvEvent struct {
kv roachpb.KeyValue
}

var _ Event = kvEvent{}

// KV is populated when eventType is KVEvent.
KV roachpb.KeyValue
// Resolved is populated when eventType is ResolvedEvent.
Resolved time.Time
// Type implements the Event interface.
func (kve kvEvent) Type() EventType {
return KVEvent
}

// MakeKVEvent creates a KV typed Event.
// GetKV implements the Event interface.
func (kve kvEvent) GetKV() *roachpb.KeyValue {
return &kve.kv
}

// GetResolved implements the Event interface.
func (kve kvEvent) GetResolved() *time.Time {
return nil
}

type checkpointEvent struct {
resolvedTimestamp time.Time
}

var _ Event = checkpointEvent{}

// Type implements the Event interface.
func (ce checkpointEvent) Type() EventType {
return CheckpointEvent
}

// GetKV implements the Event interface.
func (ce checkpointEvent) GetKV() *roachpb.KeyValue {
return nil
}

// GetResolved implements the Event interface.
func (ce checkpointEvent) GetResolved() *time.Time {
return &ce.resolvedTimestamp
}

// MakeKVEvent creates an Event from a KV.
func MakeKVEvent(kv roachpb.KeyValue) Event {
return Event{
EventType: KVEvent,
KV: kv,
}
}

// MakeResolvedEvent makes a resolved timestamp Event.
func MakeResolvedEvent(resolvedTimestamp time.Time) Event {
return Event{
EventType: ResolvedEvent,
Resolved: resolvedTimestamp,
}
return kvEvent{kv: kv}
}

// MakeCheckpointEvent creates an Event from a resolved timestamp.
func MakeCheckpointEvent(resolvedTimestamp time.Time) Event {
return checkpointEvent{resolvedTimestamp: resolvedTimestamp}
}
2 changes: 1 addition & 1 deletion pkg/ccl/streamingccl/streamclient/stream_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (sc mockStreamClient) ConsumePartition(

events := make(chan streamingccl.Event, 100)
events <- streamingccl.MakeKVEvent(sampleKV)
events <- streamingccl.MakeResolvedEvent(timeutil.Now())
events <- streamingccl.MakeCheckpointEvent(timeutil.Now())
close(events)

return events, nil
Expand Down

0 comments on commit 60bf11a

Please sign in to comment.