Skip to content

Commit

Permalink
rename batch to mode
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesCheung96 committed May 17, 2022
1 parent 85d6bb5 commit 458705f
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 12 deletions.
27 changes: 19 additions & 8 deletions cdc/model/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type PolymorphicEvent struct {
// Commit or resolved TS
CRTs uint64
// Identify whether the resolved event is in batch mode.
Batch bool
Mode ResolvedMode

RawKV *RawKVEntry
Row *RowChangedEvent
Expand Down Expand Up @@ -68,7 +68,7 @@ func (e *PolymorphicEvent) IsResolved() bool {

// IsBatchResolved returns true if the event is batch resolved event.
func (e *PolymorphicEvent) IsBatchResolved() bool {
return e.IsResolved() && e.Batch
return e.IsResolved() && e.Mode == BatchResolvedMode
}

// ComparePolymorphicEvents compares two events by CRTs, Resolved, StartTs, Delete/Put order.
Expand All @@ -94,18 +94,29 @@ func ComparePolymorphicEvents(i, j *PolymorphicEvent) bool {
return i.CRTs < j.CRTs
}

type ResolvedMode int

const (
// With `ResolvedTs.Mode == NormalResolvedMode`, TiCDC guarantees that all events whose commitTs
// is less than or equal to `resolved.Ts` are sent to Sink.
NormalResolvedMode ResolvedMode = iota
// With `ResolvedTs.Mode == BatchResolvedMode`, TiCDC guarantees that all events whose commitTs
// is less than 'resolved.Ts' are sent to Sink.
BatchResolvedMode
)

// ResolvedTs is the resolved timestamp of sink module.
type ResolvedTs struct {
Ts uint64
Batch bool
Ts uint64
Mode ResolvedMode
}

// NewResolvedTs creates a new ResolvedTs.
func NewResolvedTs(t uint64) ResolvedTs {
return ResolvedTs{Ts: t, Batch: false}
return ResolvedTs{Ts: t, Mode: NormalResolvedMode}
}

// NewBatchResolvedTs creates a ResolvedTs with a given batch type.
func NewBatchResolvedTs(t uint64, b bool) ResolvedTs {
return ResolvedTs{Ts: t, Batch: b}
// NewResolvedTsWithMode creates a ResolvedTs with a given batch type.
func NewResolvedTsWithMode(t uint64, m ResolvedMode) ResolvedTs {
return ResolvedTs{Ts: t, Mode: m}
}
4 changes: 4 additions & 0 deletions cdc/model/mounter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
package model

import (
"fmt"
"testing"
"unsafe"

"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -43,4 +45,6 @@ func TestPolymorphicEvent(t *testing.T) {
require.Equal(t, rawResolved, polyEvent.RawKV)
require.Equal(t, resolved.CRTs, polyEvent.CRTs)
require.Equal(t, uint64(0), polyEvent.StartTs)
fmt.Println(unsafe.Sizeof(PolymorphicEvent{}))
fmt.Println(true && 1 == 1)
}
2 changes: 1 addition & 1 deletion cdc/processor/pipeline/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ func (n *sinkNode) HandleMessage(ctx context.Context, msg pmessage.Message) (boo
failpoint.Return(false, errors.New("processor sync resolved injected error"))
})

resolved := model.NewBatchResolvedTs(event.CRTs, event.Batch)
resolved := model.NewResolvedTsWithMode(event.CRTs, event.Mode)
if err := n.flushSink(ctx, resolved); err != nil {
return false, errors.Trace(err)
}
Expand Down
6 changes: 3 additions & 3 deletions cdc/sink/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ type Sink interface {

// FlushRowChangedEvents flushes each row which of commitTs less than or
// equal to `resolved.Ts` into downstream.
// With `resolved.Batch == false`, TiCDC guarantees that all the Events whose commitTs
// is less than or equal to `resolved.Ts` are sent to Sink through `EmitRowChangedEvents`.
// With `resolved.Batch == true`, TiCDC guarantees that all events whose commitTs
// 1. With `resolved.Mode == NormalResolvedMode`, TiCDC guarantees that all events whose commitTs
// is less than or equal to `resolved.Ts` are sent to Sink.
// 2. With `resolved.Mode == BatchResolvedMode`, TiCDC guarantees that all events whose commitTs
// is less than 'resolved.Ts' are sent to Sink.
//
// FlushRowChangedEvents is thread-safe.
Expand Down

0 comments on commit 458705f

Please sign in to comment.