Skip to content

Commit

Permalink
Squashed commit of the following:
Browse files Browse the repository at this point in the history
commit c092599
Author: Ti Chi Robot <[email protected]>
Date:   Wed Jun 12 00:26:59 2024 +0800

    pkg/config, sink(ticdc): support output raw change event for mq and cloud storage sink (pingcap#11226) (pingcap#11290)

    close pingcap#11211

commit 3426e46
Author: Ti Chi Robot <[email protected]>
Date:   Tue Jun 11 19:40:29 2024 +0800

    puller(ticdc): fix wrong update splitting behavior after table scheduling (pingcap#11269) (pingcap#11282)

    close pingcap#11219

commit 2a28078
Author: Ti Chi Robot <[email protected]>
Date:   Tue Jun 11 16:40:37 2024 +0800

    mysql(ticdc): remove error filter when check isTiDB in backend init (pingcap#11214) (pingcap#11261)

    close pingcap#11213

commit 2425d54
Author: Ti Chi Robot <[email protected]>
Date:   Tue Jun 11 16:40:30 2024 +0800

    log(ticdc): Add more error query information to the returned error to facilitate users to know the cause of the failure (pingcap#10945) (pingcap#11257)

    close pingcap#11254

commit 053cdaf
Author: Ti Chi Robot <[email protected]>
Date:   Tue Jun 11 15:34:30 2024 +0800

    cdc: log slow conflict detect every 60s (pingcap#11251) (pingcap#11287)

    close pingcap#11271

commit 327ba7b
Author: Ti Chi Robot <[email protected]>
Date:   Tue Jun 11 11:42:00 2024 +0800

    redo(ticdc): return internal error in redo writer (pingcap#11011) (pingcap#11091)

    close pingcap#10124

commit d82ae89
Author: Ti Chi Robot <[email protected]>
Date:   Mon Jun 10 22:28:29 2024 +0800

    ddl_puller (ticdc): handle dorp pk/uk ddl correctly (pingcap#10965) (pingcap#10981)

    close pingcap#10890

commit f15bec9
Author: Ti Chi Robot <[email protected]>
Date:   Fri Jun 7 16:16:28 2024 +0800

    redo(ticdc): enable pprof and set memory limit for redo applier (pingcap#10904) (pingcap#10996)

    close pingcap#10900

commit ba50a0e
Author: Ti Chi Robot <[email protected]>
Date:   Wed Jun 5 19:58:26 2024 +0800

    test(ticdc): enable sequence test (pingcap#11023) (pingcap#11037)

    close pingcap#11015

commit 94b9897
Author: Ti Chi Robot <[email protected]>
Date:   Wed Jun 5 17:08:56 2024 +0800

    mounter(ticdc): timezone fill default value should also consider tz. (pingcap#10932) (pingcap#10946)

    close pingcap#10931

commit a912d33
Author: Ti Chi Robot <[email protected]>
Date:   Wed Jun 5 10:49:25 2024 +0800

    mysql (ticdc): Improve the performance of the mysql sink by refining the transaction event batching logic (pingcap#10466) (pingcap#11242)

    close pingcap#11241

commit 6277d9a
Author: dongmen <[email protected]>
Date:   Wed May 29 20:13:22 2024 +0800

    kvClient (ticdc): revert e5999e3 to remove useless metrics (pingcap#11184)

    close pingcap#11073

commit 54e93ed
Author: dongmen <[email protected]>
Date:   Wed May 29 17:43:22 2024 +0800

    syncpoint (ticdc): make syncpoint support base64 encoded password (pingcap#11162)

    close pingcap#10516

commit 0ba9329
Author: Ti Chi Robot <[email protected]>
Date:   Wed May 29 09:07:21 2024 +0800

    (redo)ticdc: fix the event orderliness in redo log (pingcap#11117) (pingcap#11180)

    close pingcap#11096

Signed-off-by: qupeng <[email protected]>
  • Loading branch information
hicqu committed Jun 12, 2024
1 parent 2d96d8a commit a3d30ca
Show file tree
Hide file tree
Showing 70 changed files with 2,198 additions and 514 deletions.
50 changes: 28 additions & 22 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,18 +354,20 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
SASLOAuthGrantType: c.Sink.KafkaConfig.SASLOAuthGrantType,
SASLOAuthAudience: c.Sink.KafkaConfig.SASLOAuthAudience,
LargeMessageHandle: largeMessageHandle,
OutputRawChangeEvent: c.Sink.KafkaConfig.OutputRawChangeEvent,
}
}

if c.Sink.CloudStorageConfig != nil {
res.Sink.CloudStorageConfig = &config.CloudStorageConfig{
WorkerCount: c.Sink.CloudStorageConfig.WorkerCount,
FlushInterval: c.Sink.CloudStorageConfig.FlushInterval,
FileSize: c.Sink.CloudStorageConfig.FileSize,
FlushConcurrency: c.Sink.CloudStorageConfig.FlushConcurrency,
OutputColumnID: c.Sink.CloudStorageConfig.OutputColumnID,
FileExpirationDays: c.Sink.CloudStorageConfig.FileExpirationDays,
FileCleanupCronSpec: c.Sink.CloudStorageConfig.FileCleanupCronSpec,
WorkerCount: c.Sink.CloudStorageConfig.WorkerCount,
FlushInterval: c.Sink.CloudStorageConfig.FlushInterval,
FileSize: c.Sink.CloudStorageConfig.FileSize,
FlushConcurrency: c.Sink.CloudStorageConfig.FlushConcurrency,
OutputColumnID: c.Sink.CloudStorageConfig.OutputColumnID,
FileExpirationDays: c.Sink.CloudStorageConfig.FileExpirationDays,
FileCleanupCronSpec: c.Sink.CloudStorageConfig.FileCleanupCronSpec,
OutputRawChangeEvent: c.Sink.CloudStorageConfig.OutputRawChangeEvent,
}
}
}
Expand Down Expand Up @@ -502,18 +504,20 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
SASLOAuthGrantType: cloned.Sink.KafkaConfig.SASLOAuthGrantType,
SASLOAuthAudience: cloned.Sink.KafkaConfig.SASLOAuthAudience,
LargeMessageHandle: largeMessageHandle,
OutputRawChangeEvent: cloned.Sink.KafkaConfig.OutputRawChangeEvent,
}
}

if cloned.Sink.CloudStorageConfig != nil {
res.Sink.CloudStorageConfig = &CloudStorageConfig{
WorkerCount: cloned.Sink.CloudStorageConfig.WorkerCount,
FlushInterval: cloned.Sink.CloudStorageConfig.FlushInterval,
FileSize: cloned.Sink.CloudStorageConfig.FileSize,
FlushConcurrency: cloned.Sink.CloudStorageConfig.FlushConcurrency,
OutputColumnID: cloned.Sink.CloudStorageConfig.OutputColumnID,
FileExpirationDays: cloned.Sink.CloudStorageConfig.FileExpirationDays,
FileCleanupCronSpec: cloned.Sink.CloudStorageConfig.FileCleanupCronSpec,
WorkerCount: cloned.Sink.CloudStorageConfig.WorkerCount,
FlushInterval: cloned.Sink.CloudStorageConfig.FlushInterval,
FileSize: cloned.Sink.CloudStorageConfig.FileSize,
FlushConcurrency: cloned.Sink.CloudStorageConfig.FlushConcurrency,
OutputColumnID: cloned.Sink.CloudStorageConfig.OutputColumnID,
FileExpirationDays: cloned.Sink.CloudStorageConfig.FileExpirationDays,
FileCleanupCronSpec: cloned.Sink.CloudStorageConfig.FileCleanupCronSpec,
OutputRawChangeEvent: cloned.Sink.CloudStorageConfig.OutputRawChangeEvent,
}
}
}
Expand Down Expand Up @@ -679,18 +683,20 @@ type KafkaConfig struct {
SASLOAuthGrantType *string `json:"sasl_oauth_grant_type,omitempty"`
SASLOAuthAudience *string `json:"sasl_oauth_audience,omitempty"`

LargeMessageHandle *LargeMessageHandleConfig `json:"large_message_handle,omitempty"`
LargeMessageHandle *LargeMessageHandleConfig `json:"large_message_handle,omitempty"`
OutputRawChangeEvent *bool `json:"output_raw_change_event,omitempty"`
}

// CloudStorageConfig represents a cloud storage sink configuration
type CloudStorageConfig struct {
WorkerCount *int `json:"worker_count,omitempty"`
FlushInterval *string `json:"flush_interval,omitempty"`
FileSize *int `json:"file_size,omitempty"`
FlushConcurrency *int `json:"flush_concurrency,omitempty"`
OutputColumnID *bool `json:"output_column_id,omitempty"`
FileExpirationDays *int `json:"file_expiration_days,omitempty"`
FileCleanupCronSpec *string `json:"file_cleanup_cron_spec,omitempty"`
WorkerCount *int `json:"worker_count,omitempty"`
FlushInterval *string `json:"flush_interval,omitempty"`
FileSize *int `json:"file_size,omitempty"`
FlushConcurrency *int `json:"flush_concurrency,omitempty"`
OutputColumnID *bool `json:"output_column_id,omitempty"`
FileExpirationDays *int `json:"file_expiration_days,omitempty"`
FileCleanupCronSpec *string `json:"file_cleanup_cron_spec,omitempty"`
OutputRawChangeEvent *bool `json:"output_raw_change_event,omitempty"`
}

// CSVConfig denotes the csv config
Expand Down
9 changes: 9 additions & 0 deletions cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,15 @@ func getDefaultOrZeroValue(col *timodel.ColumnInfo, tz *time.Location) (types.Da
if err != nil {
return d, d.GetValue(), sizeOfDatum(d), "", errors.Trace(err)
}
switch col.GetType() {
case mysql.TypeTimestamp:
t := d.GetMysqlTime()
err = t.ConvertTimeZone(time.UTC, tz)
if err != nil {
return d, d.GetValue(), sizeOfDatum(d), "", errors.Trace(err)
}
d.SetMysqlTime(t)
}
} else if !mysql.HasNotNullFlag(col.GetFlag()) {
// NOTICE: NotNullCheck need do after OriginDefaultValue check, as when TiDB meet "amend + add column default xxx",
// ref: https://github.com/pingcap/ticdc/issues/3929
Expand Down
4 changes: 2 additions & 2 deletions cdc/entry/mounter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -875,7 +875,7 @@ func TestGetDefaultZeroValue(t *testing.T) {
sc.TimeZone = tz
expected, err := types.ParseTimeFromFloatString(
sc,
"2020-11-19 12:12:12", colInfo.FieldType.GetType(), colInfo.FieldType.GetDecimal())
"2020-11-19 20:12:12", colInfo.FieldType.GetType(), colInfo.FieldType.GetDecimal())
require.NoError(t, err)
require.Equal(t, expected.String(), val, "mysql.TypeTimestamp + notnull + default")

Expand All @@ -886,7 +886,7 @@ func TestGetDefaultZeroValue(t *testing.T) {
_, val, _, _, _ = getDefaultOrZeroValue(&colInfo, tz)
expected, err = types.ParseTimeFromFloatString(
sc,
"2020-11-19 12:12:12", colInfo.FieldType.GetType(), colInfo.FieldType.GetDecimal())
"2020-11-19 20:12:12", colInfo.FieldType.GetType(), colInfo.FieldType.GetDecimal())
require.NoError(t, err)
require.Equal(t, expected.String(), val, "mysql.TypeTimestamp + null + default")

Expand Down
79 changes: 12 additions & 67 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"fmt"
"io"
"math/rand"
"strconv"
"strings"
"sync"
"sync/atomic"
Expand All @@ -39,7 +38,6 @@ import (
"github.com/pingcap/tiflow/pkg/retry"
"github.com/pingcap/tiflow/pkg/txnutil"
"github.com/pingcap/tiflow/pkg/version"
"github.com/prometheus/client_golang/prometheus"
tidbkv "github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"
Expand Down Expand Up @@ -75,8 +73,6 @@ const (
resolveLockMinInterval = 10 * time.Second

scanRegionsConcurrency = 1024

tableMonitorInterval = 2 * time.Second
)

// time interval to force kv client to terminate gRPC stream and reconnect
Expand Down Expand Up @@ -166,7 +162,6 @@ type CDCKVClient interface {
ts uint64,
lockResolver txnutil.LockResolver,
eventCh chan<- model.RegionFeedEvent,
enableTableMonitor bool,
) error

// RegionCount returns the number of captured regions.
Expand Down Expand Up @@ -310,9 +305,8 @@ func (c *CDCClient) EventFeed(
ctx context.Context, span regionspan.ComparableSpan, ts uint64,
lockResolver txnutil.LockResolver,
eventCh chan<- model.RegionFeedEvent,
enableTableMonitor bool,
) error {
s := newEventFeedSession(c, span, lockResolver, ts, eventCh, enableTableMonitor)
s := newEventFeedSession(c, span, lockResolver, ts, eventCh)
return s.eventFeed(ctx)
}

Expand Down Expand Up @@ -396,11 +390,6 @@ type eventFeedSession struct {

rangeLock *regionspan.RegionRangeLock

enableTableMonitor bool
regionChSizeGauge prometheus.Gauge
errChSizeGauge prometheus.Gauge
rangeChSizeGauge prometheus.Gauge

// storeStreamsCache is used to cache the established gRPC streams to TiKV stores.
// Note: The cache is not thread-safe, so it should be accessed in the same goroutine.
// For now, it is only accessed in the `requestRegionToStore` goroutine.
Expand All @@ -421,31 +410,23 @@ func newEventFeedSession(
lockResolver txnutil.LockResolver,
startTs uint64,
eventCh chan<- model.RegionFeedEvent,
enableTableMonitor bool,
) *eventFeedSession {
id := allocateRequestID()
rangeLock := regionspan.NewRegionRangeLock(
id, totalSpan.Start, totalSpan.End, startTs,
client.changefeed.Namespace+"."+client.changefeed.ID)

return &eventFeedSession{
client: client,
startTs: startTs,
changefeed: client.changefeed,
tableID: client.tableID,
tableName: client.tableName,
storeStreamsCache: make(map[string]*eventFeedStream),
totalSpan: totalSpan,
eventCh: eventCh,
rangeLock: rangeLock,
lockResolver: lockResolver,
enableTableMonitor: enableTableMonitor,
regionChSizeGauge: clientChannelSize.WithLabelValues(client.changefeed.Namespace,
client.changefeed.ID, strconv.FormatInt(client.tableID, 10), "region"),
errChSizeGauge: clientChannelSize.WithLabelValues(client.changefeed.Namespace,
client.changefeed.ID, strconv.FormatInt(client.tableID, 10), "err"),
rangeChSizeGauge: clientChannelSize.WithLabelValues(client.changefeed.Namespace,
client.changefeed.ID, strconv.FormatInt(client.tableID, 10), "range"),
client: client,
startTs: startTs,
changefeed: client.changefeed,
tableID: client.tableID,
tableName: client.tableName,
storeStreamsCache: make(map[string]*eventFeedStream),
totalSpan: totalSpan,
eventCh: eventCh,
rangeLock: rangeLock,
lockResolver: lockResolver,
resolvedTsPool: sync.Pool{
New: func() any {
return &regionStatefulEvent{
Expand Down Expand Up @@ -486,7 +467,6 @@ func (s *eventFeedSession) eventFeed(ctx context.Context) error {
case <-ctx.Done():
return ctx.Err()
case task := <-s.requestRangeCh.Out():
s.rangeChSizeGauge.Dec()
// divideAndSendEventFeedToRegions could be blocked for some time,
// since it must wait for the region lock available. In order to
// consume region range request from `requestRangeCh` as soon as
Expand All @@ -508,7 +488,6 @@ func (s *eventFeedSession) eventFeed(ctx context.Context) error {
case <-ctx.Done():
return ctx.Err()
case errInfo := <-s.errCh.Out():
s.errChSizeGauge.Dec()
if err := s.handleError(ctx, errInfo); err != nil {
return err
}
Expand All @@ -518,7 +497,6 @@ func (s *eventFeedSession) eventFeed(ctx context.Context) error {
})

s.requestRangeCh.In() <- rangeRequestTask{span: s.totalSpan}
s.rangeChSizeGauge.Inc()

log.Info("event feed started",
zap.String("namespace", s.changefeed.Namespace),
Expand All @@ -539,7 +517,6 @@ func (s *eventFeedSession) scheduleDivideRegionAndRequest(
task := rangeRequestTask{span: span}
select {
case s.requestRangeCh.In() <- task:
s.rangeChSizeGauge.Inc()
case <-ctx.Done():
}
}
Expand All @@ -553,7 +530,6 @@ func (s *eventFeedSession) scheduleRegionRequest(ctx context.Context, sri single
sri.lockedRange = res.LockedRange
select {
case s.regionCh.In() <- sri:
s.regionChSizeGauge.Inc()
case <-ctx.Done():
}
case regionspan.LockRangeStatusStale:
Expand Down Expand Up @@ -615,7 +591,6 @@ func (s *eventFeedSession) onRegionFail(ctx context.Context, errorInfo regionErr
zap.Error(errorInfo.err))
select {
case s.errCh.In() <- errorInfo:
s.errChSizeGauge.Inc()
case <-ctx.Done():
}
}
Expand Down Expand Up @@ -791,7 +766,6 @@ func (s *eventFeedSession) dispatchRequest(ctx context.Context) error {
case <-ctx.Done():
return errors.Trace(ctx.Err())
case sri = <-s.regionCh.Out():
s.regionChSizeGauge.Dec()
}

// Send a resolved ts to event channel first, for two reasons:
Expand Down Expand Up @@ -1040,11 +1014,6 @@ func (s *eventFeedSession) receiveFromStream(

metricSendEventBatchResolvedSize := batchResolvedEventSize.
WithLabelValues(s.changefeed.Namespace, s.changefeed.ID)
metricReceiveBusyRatio := workerBusyRatio.WithLabelValues(
s.changefeed.Namespace, s.changefeed.ID, strconv.FormatInt(s.tableID, 10), stream.addr, "event-receiver")
metricProcessBusyRatio := workerBusyRatio.WithLabelValues(
s.changefeed.Namespace, s.changefeed.ID, strconv.FormatInt(s.tableID, 10), stream.addr, "event-processor")

// always create a new region worker, because `receiveFromStream` is ensured
// to call exactly once from outer code logic
worker := newRegionWorker(parentCtx, stream, s)
Expand All @@ -1063,7 +1032,7 @@ func (s *eventFeedSession) receiveFromStream(

eg, ctx := errgroup.WithContext(ctx)
eg.Go(func() error {
err := handleExit(worker.run(s.enableTableMonitor))
err := handleExit(worker.run())
if err != nil {
log.Error("region worker exited with error",
zap.String("namespace", s.changefeed.Namespace),
Expand All @@ -1079,32 +1048,10 @@ func (s *eventFeedSession) receiveFromStream(
})

receiveEvents := func() error {
var receiveTime time.Duration
var processTime time.Duration
startToWork := time.Now()

maxCommitTs := model.Ts(0)
for {
startToReceive := time.Now()
cevent, err := stream.client.Recv()

if s.enableTableMonitor {
receiveTime += time.Since(startToReceive)
if time.Since(startToWork) >= tableMonitorInterval {
now := time.Now()
// Receive busyRatio indicates the blocking time (receive and decode grpc msg) of the worker.
busyRatio := receiveTime.Seconds() / now.Sub(startToWork).Seconds() * 100
metricReceiveBusyRatio.Set(busyRatio)
receiveTime = 0
// Process busyRatio indicates the working time (dispatch to region worker) of the worker.
busyRatio = processTime.Seconds() / now.Sub(startToWork).Seconds() * 100
metricProcessBusyRatio.Set(busyRatio)
processTime = 0

startToWork = now
}
}

failpoint.Inject("kvClientRegionReentrantError", func(op failpoint.Value) {
if op.(string) == "error" {
_ = worker.sendEvents(ctx, []*regionStatefulEvent{nil})
Expand Down Expand Up @@ -1163,7 +1110,6 @@ func (s *eventFeedSession) receiveFromStream(
return nil
}

startToProcess := time.Now()
size := cevent.Size()
if size > warnRecvMsgSizeThreshold {
regionCount := 0
Expand Down Expand Up @@ -1208,7 +1154,6 @@ func (s *eventFeedSession) receiveFromStream(
tsStat.commitTs.Store(maxCommitTs)
}
}
processTime += time.Since(startToProcess)
}
}
eg.Go(func() error {
Expand Down
4 changes: 2 additions & 2 deletions cdc/kv/client_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func prepareBenchMultiStore(b *testing.B, storeNum, regionNum int) (
go func() {
err := cdcClient.EventFeed(ctx,
regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")},
100, lockResolver, eventCh, false)
100, lockResolver, eventCh)
if errors.Cause(err) != context.Canceled {
b.Error(err)
}
Expand Down Expand Up @@ -296,7 +296,7 @@ func prepareBench(b *testing.B, regionNum int) (
go func() {
err := cdcClient.EventFeed(ctx,
regionspan.ComparableSpan{Start: []byte("a"), End: []byte("z")},
100, lockResolver, eventCh, false)
100, lockResolver, eventCh)
if errors.Cause(err) != context.Canceled {
b.Error(err)
}
Expand Down
Loading

0 comments on commit a3d30ca

Please sign in to comment.