From 223283c2822ceeedae2d5a1b47a15c38454717a0 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Mon, 26 Feb 2024 22:27:29 +0800 Subject: [PATCH] sink(ticdc): use pd clock in storage sink (#10351) (#10520) close pingcap/tiflow#10352, ref pingcap/tiflow#10374 --- cdc/api/v1/validator.go | 4 +- cdc/api/v2/api_helpers.go | 4 +- cdc/processor/sinkmanager/manager.go | 2 +- .../cloudstorage/cloud_storage_dml_sink.go | 6 +- .../cloud_storage_dml_sink_test.go | 19 +++--- cdc/sink/dmlsink/cloudstorage/dml_worker.go | 6 +- .../dmlsink/cloudstorage/dml_worker_test.go | 4 +- cdc/sink/dmlsink/factory/factory.go | 4 +- cdc/sink/validator/validator.go | 8 ++- cdc/sink/validator/validator_test.go | 8 +-- cmd/kafka-consumer/main.go | 1 + cmd/storage-consumer/main.go | 1 + errors.toml | 5 ++ pkg/applier/redo.go | 2 +- pkg/errors/cdc_errors.go | 4 ++ pkg/pdutil/clock.go | 22 +++++++ pkg/sink/cloudstorage/path.go | 59 ++++++++++++------- pkg/sink/cloudstorage/path_test.go | 14 +++-- 18 files changed, 120 insertions(+), 53 deletions(-) diff --git a/cdc/api/v1/validator.go b/cdc/api/v1/validator.go index 549d76d5cd0..b4729e52649 100644 --- a/cdc/api/v1/validator.go +++ b/cdc/api/v1/validator.go @@ -180,7 +180,7 @@ func verifyCreateChangefeedConfig( return nil, cerror.ErrAPIInvalidParam.Wrap(errors.Annotatef(err, "invalid timezone:%s", changefeedConfig.TimeZone)) } ctx = contextutil.PutTimezoneInCtx(ctx, tz) - if err := validator.Validate(ctx, info.SinkURI, info.Config); err != nil { + if err := validator.Validate(ctx, info.SinkURI, info.Config, up.PDClock); err != nil { return nil, err } @@ -238,7 +238,7 @@ func VerifyUpdateChangefeedConfig(ctx context.Context, return nil, cerror.ErrChangefeedUpdateRefused.GenWithStackByCause(err) } - if err := validator.Validate(ctx, newInfo.SinkURI, newInfo.Config); err != nil { + if err := validator.Validate(ctx, newInfo.SinkURI, newInfo.Config, nil); err != nil { return nil, cerror.ErrChangefeedUpdateRefused.GenWithStackByCause(err) } } diff --git a/cdc/api/v2/api_helpers.go b/cdc/api/v2/api_helpers.go index 86dd77703d8..2e8edf79332 100644 --- a/cdc/api/v2/api_helpers.go +++ b/cdc/api/v2/api_helpers.go @@ -228,7 +228,7 @@ func (APIV2HelpersImpl) verifyCreateChangefeedConfig( } // verify sink - if err := validator.Validate(ctx, cfg.SinkURI, replicaCfg); err != nil { + if err := validator.Validate(ctx, cfg.SinkURI, replicaCfg, nil); err != nil { return nil, err } @@ -352,7 +352,7 @@ func (APIV2HelpersImpl) verifyUpdateChangefeedConfig( return nil, nil, cerror.ErrChangefeedUpdateRefused.GenWithStackByCause(err) } - if err := validator.Validate(ctx, newInfo.SinkURI, newInfo.Config); err != nil { + if err := validator.Validate(ctx, newInfo.SinkURI, newInfo.Config, nil); err != nil { return nil, nil, cerror.ErrChangefeedUpdateRefused.GenWithStackByCause(err) } } diff --git a/cdc/processor/sinkmanager/manager.go b/cdc/processor/sinkmanager/manager.go index c2765e72af6..a5534e9f2e1 100644 --- a/cdc/processor/sinkmanager/manager.go +++ b/cdc/processor/sinkmanager/manager.go @@ -351,7 +351,7 @@ func (m *SinkManager) initSinkFactory() (chan error, uint64) { return m.sinkFactory.errors, m.sinkFactory.version } - m.sinkFactory.f, err = factory.New(m.managerCtx, uri, cfg, m.sinkFactory.errors) + m.sinkFactory.f, err = factory.New(m.managerCtx, uri, cfg, m.sinkFactory.errors, m.up.PDClock) if err != nil { emitError(err) return m.sinkFactory.errors, m.sinkFactory.version diff --git a/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go b/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go index 1069ef4a1ce..c0fa549752c 100644 --- a/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go +++ b/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go @@ -28,10 +28,10 @@ import ( "github.com/pingcap/tiflow/cdc/sink/metrics" "github.com/pingcap/tiflow/cdc/sink/tablesink/state" "github.com/pingcap/tiflow/cdc/sink/util" - "github.com/pingcap/tiflow/engine/pkg/clock" "github.com/pingcap/tiflow/pkg/chann" "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/pdutil" "github.com/pingcap/tiflow/pkg/sink" "github.com/pingcap/tiflow/pkg/sink/cloudstorage" "github.com/pingcap/tiflow/pkg/sink/codec/builder" @@ -94,6 +94,7 @@ type DMLSink struct { // NewDMLSink creates a cloud storage sink. func NewDMLSink(ctx context.Context, + pdClock pdutil.Clock, sinkURI *url.URL, replicaConfig *config.ReplicaConfig, errCh chan error, @@ -153,11 +154,10 @@ func NewDMLSink(ctx context.Context, // create defragmenter. s.defragmenter = newDefragmenter(encodedCh, workerChannels) // create a group of dml workers. - clock := clock.New() for i := 0; i < cfg.WorkerCount; i++ { inputCh := chann.NewAutoDrainChann[eventFragment]() s.workers[i] = newDMLWorker(i, s.changefeedID, storage, cfg, ext, - inputCh, clock, s.statistics) + inputCh, pdClock, s.statistics) workerChannels[i] = inputCh } diff --git a/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink_test.go b/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink_test.go index 9ef1fa8e42a..362a3c816ff 100644 --- a/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink_test.go +++ b/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink_test.go @@ -31,12 +31,13 @@ import ( "github.com/pingcap/tiflow/cdc/sink/tablesink/state" "github.com/pingcap/tiflow/engine/pkg/clock" "github.com/pingcap/tiflow/pkg/config" + "github.com/pingcap/tiflow/pkg/pdutil" "github.com/stretchr/testify/require" ) func setClock(s *DMLSink, clock clock.Clock) { for _, w := range s.workers { - w.filePathGenerator.SetClock(clock) + w.filePathGenerator.SetClock(pdutil.NewMonotonicClock(clock)) } } @@ -126,7 +127,7 @@ func TestCloudStorageWriteEventsWithoutDateSeparator(t *testing.T) { replicaConfig.Sink.Protocol = config.ProtocolCsv.String() replicaConfig.Sink.FileIndexWidth = 6 errCh := make(chan error, 5) - s, err := NewDMLSink(ctx, sinkURI, replicaConfig, errCh) + s, err := NewDMLSink(ctx, pdutil.NewMonotonicClock(clock.New()), sinkURI, replicaConfig, errCh) require.Nil(t, err) var cnt uint64 = 0 batch := 100 @@ -194,10 +195,11 @@ func TestCloudStorageWriteEventsWithDateSeparator(t *testing.T) { replicaConfig.Sink.FileIndexWidth = 6 errCh := make(chan error, 5) - s, err := NewDMLSink(ctx, sinkURI, replicaConfig, errCh) - require.Nil(t, err) mockClock := clock.NewMock() - setClock(s, mockClock) + s, err := NewDMLSink(ctx, + pdutil.NewMonotonicClock(mockClock), + sinkURI, replicaConfig, errCh) + require.Nil(t, err) var cnt uint64 = 0 batch := 100 @@ -268,11 +270,12 @@ func TestCloudStorageWriteEventsWithDateSeparator(t *testing.T) { // test table is scheduled from one node to another cnt = 0 ctx, cancel = context.WithCancel(context.Background()) - s, err = NewDMLSink(ctx, sinkURI, replicaConfig, errCh) - require.Nil(t, err) mockClock = clock.NewMock() mockClock.Set(time.Date(2023, 3, 9, 0, 1, 10, 0, time.UTC)) - setClock(s, mockClock) + s, err = NewDMLSink(ctx, + pdutil.NewMonotonicClock(mockClock), + sinkURI, replicaConfig, errCh) + require.Nil(t, err) err = s.WriteEvents(txns...) require.Nil(t, err) diff --git a/cdc/sink/dmlsink/cloudstorage/dml_worker.go b/cdc/sink/dmlsink/cloudstorage/dml_worker.go index 07e753c6c2c..e6970cf6c9c 100644 --- a/cdc/sink/dmlsink/cloudstorage/dml_worker.go +++ b/cdc/sink/dmlsink/cloudstorage/dml_worker.go @@ -24,9 +24,9 @@ import ( "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sink/metrics" mcloudstorage "github.com/pingcap/tiflow/cdc/sink/metrics/cloudstorage" - "github.com/pingcap/tiflow/engine/pkg/clock" "github.com/pingcap/tiflow/pkg/chann" "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/pdutil" "github.com/pingcap/tiflow/pkg/sink/cloudstorage" "github.com/pingcap/tiflow/pkg/sink/codec/common" "github.com/prometheus/client_golang/prometheus" @@ -103,7 +103,7 @@ func newDMLWorker( config *cloudstorage.Config, extension string, inputCh *chann.DrainableChann[eventFragment], - clock clock.Clock, + pdClock pdutil.Clock, statistics *metrics.Statistics, ) *dmlWorker { d := &dmlWorker{ @@ -114,7 +114,7 @@ func newDMLWorker( inputCh: inputCh, flushNotifyCh: make(chan dmlTask, 64), statistics: statistics, - filePathGenerator: cloudstorage.NewFilePathGenerator(config, storage, extension, clock), + filePathGenerator: cloudstorage.NewFilePathGenerator(changefeedID, config, storage, extension, pdClock), metricWriteBytes: mcloudstorage.CloudStorageWriteBytesGauge. WithLabelValues(changefeedID.Namespace, changefeedID.ID), metricFileCount: mcloudstorage.CloudStorageFileCountGauge. diff --git a/cdc/sink/dmlsink/cloudstorage/dml_worker_test.go b/cdc/sink/dmlsink/cloudstorage/dml_worker_test.go index 2e8dd5620c5..d964421fcfb 100644 --- a/cdc/sink/dmlsink/cloudstorage/dml_worker_test.go +++ b/cdc/sink/dmlsink/cloudstorage/dml_worker_test.go @@ -30,6 +30,7 @@ import ( "github.com/pingcap/tiflow/engine/pkg/clock" "github.com/pingcap/tiflow/pkg/chann" "github.com/pingcap/tiflow/pkg/config" + "github.com/pingcap/tiflow/pkg/pdutil" "github.com/pingcap/tiflow/pkg/sink" "github.com/pingcap/tiflow/pkg/sink/cloudstorage" "github.com/pingcap/tiflow/pkg/sink/codec/common" @@ -51,8 +52,9 @@ func testDMLWorker(ctx context.Context, t *testing.T, dir string) *dmlWorker { require.Nil(t, err) statistics := metrics.NewStatistics(ctx, sink.TxnSink) + pdlock := pdutil.NewMonotonicClock(clock.New()) d := newDMLWorker(1, model.DefaultChangeFeedID("dml-worker-test"), storage, - cfg, ".json", chann.NewAutoDrainChann[eventFragment](), clock.New(), statistics) + cfg, ".json", chann.NewAutoDrainChann[eventFragment](), pdlock, statistics) return d } diff --git a/cdc/sink/dmlsink/factory/factory.go b/cdc/sink/dmlsink/factory/factory.go index e037d1c8916..efa6730a516 100644 --- a/cdc/sink/dmlsink/factory/factory.go +++ b/cdc/sink/dmlsink/factory/factory.go @@ -30,6 +30,7 @@ import ( "github.com/pingcap/tiflow/cdc/sink/tablesink" "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/pdutil" "github.com/pingcap/tiflow/pkg/sink" "github.com/pingcap/tiflow/pkg/sink/kafka" v2 "github.com/pingcap/tiflow/pkg/sink/kafka/v2" @@ -67,6 +68,7 @@ func New( sinkURIStr string, cfg *config.ReplicaConfig, errCh chan error, + pdClock pdutil.Clock, ) (*SinkFactory, error) { sinkURI, err := url.Parse(sinkURIStr) if err != nil { @@ -96,7 +98,7 @@ func New( s.txnSink = mqs s.category = CategoryMQ case sink.S3Scheme, sink.FileScheme, sink.GCSScheme, sink.GSScheme, sink.AzblobScheme, sink.AzureScheme, sink.CloudStorageNoopScheme: - storageSink, err := cloudstorage.NewDMLSink(ctx, sinkURI, cfg, errCh) + storageSink, err := cloudstorage.NewDMLSink(ctx, pdClock, sinkURI, cfg, errCh) if err != nil { return nil, err } diff --git a/cdc/sink/validator/validator.go b/cdc/sink/validator/validator.go index 302ef8012f5..f59a73b8563 100644 --- a/cdc/sink/validator/validator.go +++ b/cdc/sink/validator/validator.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/tiflow/cdc/sink/dmlsink/factory" "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/pdutil" "github.com/pingcap/tiflow/pkg/sink" pmysql "github.com/pingcap/tiflow/pkg/sink/mysql" "github.com/pingcap/tiflow/pkg/util" @@ -30,7 +31,10 @@ import ( // Validate sink if given valid parameters. // TODO: For now, we create a real sink instance and validate it. // Maybe we should support the dry-run mode to validate sink. -func Validate(ctx context.Context, sinkURI string, cfg *config.ReplicaConfig) error { +func Validate(ctx context.Context, + sinkURI string, cfg *config.ReplicaConfig, + pdClock pdutil.Clock, +) error { uri, err := preCheckSinkURI(sinkURI) if err != nil { return err @@ -48,7 +52,7 @@ func Validate(ctx context.Context, sinkURI string, cfg *config.ReplicaConfig) er } ctx, cancel := context.WithCancel(contextutil.PutRoleInCtx(ctx, util.RoleClient)) - s, err := factory.New(ctx, sinkURI, cfg, make(chan error)) + s, err := factory.New(ctx, sinkURI, cfg, make(chan error), pdClock) if err != nil { cancel() return err diff --git a/cdc/sink/validator/validator_test.go b/cdc/sink/validator/validator_test.go index 38b35e3669e..791b0e30b0d 100644 --- a/cdc/sink/validator/validator_test.go +++ b/cdc/sink/validator/validator_test.go @@ -99,26 +99,26 @@ func TestValidateSink(t *testing.T) { // test sink uri error sinkURI := "mysql://root:111@127.0.0.1:3306/" - err := Validate(ctx, sinkURI, replicateConfig) + err := Validate(ctx, sinkURI, replicateConfig, nil) require.NotNil(t, err) require.Contains(t, err.Error(), "fail to open MySQL connection") // test sink uri right sinkURI = "blackhole://" - err = Validate(ctx, sinkURI, replicateConfig) + err = Validate(ctx, sinkURI, replicateConfig, nil) require.Nil(t, err) // test bdr mode error replicateConfig.BDRMode = true sinkURI = "blackhole://" - err = Validate(ctx, sinkURI, replicateConfig) + err = Validate(ctx, sinkURI, replicateConfig, nil) require.NotNil(t, err) require.Contains(t, err.Error(), "sink uri scheme is not supported in BDR mode") // test sink-scheme/syncpoint error replicateConfig.EnableSyncPoint = true sinkURI = "kafka://" - err = Validate(ctx, sinkURI, replicateConfig) + err = Validate(ctx, sinkURI, replicateConfig, nil) require.NotNil(t, err) require.Contains( t, err.Error(), diff --git a/cmd/kafka-consumer/main.go b/cmd/kafka-consumer/main.go index 797e4609c45..933f98b92e3 100644 --- a/cmd/kafka-consumer/main.go +++ b/cmd/kafka-consumer/main.go @@ -455,6 +455,7 @@ func NewConsumer(ctx context.Context) (*Consumer, error) { downstreamURIStr, config.GetDefaultReplicaConfig(), errChan, + nil, ) if err != nil { cancel() diff --git a/cmd/storage-consumer/main.go b/cmd/storage-consumer/main.go index 3e881e79087..8e87d368c93 100644 --- a/cmd/storage-consumer/main.go +++ b/cmd/storage-consumer/main.go @@ -191,6 +191,7 @@ func newConsumer(ctx context.Context) (*consumer, error) { downstreamURIStr, config.GetDefaultReplicaConfig(), errCh, + nil, ) if err != nil { log.Error("failed to create event sink factory", zap.Error(err)) diff --git a/errors.toml b/errors.toml index 51f824f7427..4b1036675c7 100755 --- a/errors.toml +++ b/errors.toml @@ -346,6 +346,11 @@ error = ''' incompatible configuration in sink uri(%s) and config file(%s), please try to update the configuration only through sink uri ''' +["CDC:ErrInternalCheckFailed"] +error = ''' +internal check failed, %s +''' + ["CDC:ErrInternalServerError"] error = ''' internal server error diff --git a/pkg/applier/redo.go b/pkg/applier/redo.go index 32fb55cabcd..51bb8c8924a 100644 --- a/pkg/applier/redo.go +++ b/pkg/applier/redo.go @@ -122,7 +122,7 @@ func (ra *RedoApplier) catchError(ctx context.Context) error { func (ra *RedoApplier) initSink(ctx context.Context) (err error) { replicaConfig := config.GetDefaultReplicaConfig() - ra.sinkFactory, err = dmlfactory.New(ctx, ra.cfg.SinkURI, replicaConfig, ra.errCh) + ra.sinkFactory, err = dmlfactory.New(ctx, ra.cfg.SinkURI, replicaConfig, ra.errCh, nil) if err != nil { return err } diff --git a/pkg/errors/cdc_errors.go b/pkg/errors/cdc_errors.go index 5c2e72145b6..a0b5afb1485 100644 --- a/pkg/errors/cdc_errors.go +++ b/pkg/errors/cdc_errors.go @@ -872,6 +872,10 @@ var ( "invalid replica config, %s", errors.RFCCodeText("CDC:ErrInvalidReplicaConfig"), ) + ErrInternalCheckFailed = errors.Normalize( + "internal check failed, %s", + errors.RFCCodeText("CDC:ErrInternalCheckFailed"), + ) ErrHandleDDLFailed = errors.Normalize( "handle ddl failed, job: %s, query: %s, startTs: %d. "+ diff --git a/pkg/pdutil/clock.go b/pkg/pdutil/clock.go index ee7edec660b..3e7e1969a2f 100644 --- a/pkg/pdutil/clock.go +++ b/pkg/pdutil/clock.go @@ -20,6 +20,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" + pclock "github.com/pingcap/tiflow/engine/pkg/clock" "github.com/pingcap/tiflow/pkg/retry" "github.com/tikv/client-go/v2/oracle" pd "github.com/tikv/pd/client" @@ -141,3 +142,24 @@ func (c *clock4Test) Run(ctx context.Context) { func (c *clock4Test) Stop() { } + +type monotonicClock struct { + clock pclock.Clock +} + +// NewMonotonicClock return a new monotonic clock. +func NewMonotonicClock(pClock pclock.Clock) Clock { + return &monotonicClock{ + clock: pClock, + } +} + +func (c *monotonicClock) CurrentTime() (time.Time, error) { + return c.clock.Now(), nil +} + +func (c *monotonicClock) Run(ctx context.Context) { +} + +func (c *monotonicClock) Stop() { +} diff --git a/pkg/sink/cloudstorage/path.go b/pkg/sink/cloudstorage/path.go index dae3f5a8a0c..db48fe5a120 100644 --- a/pkg/sink/cloudstorage/path.go +++ b/pkg/sink/cloudstorage/path.go @@ -33,6 +33,7 @@ import ( "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/hash" + "github.com/pingcap/tiflow/pkg/pdutil" "github.com/pingcap/tiflow/pkg/util" "github.com/tikv/client-go/v2/oracle" "go.uber.org/zap" @@ -133,11 +134,12 @@ type VersionedTableName struct { // FilePathGenerator is used to generate data file path and index file path. type FilePathGenerator struct { - extension string - config *Config - clock clock.Clock - storage storage.ExternalStorage - fileIndex map[VersionedTableName]*indexWithDate + changefeedID model.ChangeFeedID + extension string + config *Config + pdClock pdutil.Clock + storage storage.ExternalStorage + fileIndex map[VersionedTableName]*indexWithDate hasher *hash.PositionInertia versionMap map[VersionedTableName]uint64 @@ -145,19 +147,27 @@ type FilePathGenerator struct { // NewFilePathGenerator creates a FilePathGenerator. func NewFilePathGenerator( + changefeedID model.ChangeFeedID, config *Config, storage storage.ExternalStorage, extension string, - clock clock.Clock, + pdclock pdutil.Clock, ) *FilePathGenerator { + if pdclock == nil { + pdclock = pdutil.NewMonotonicClock(clock.New()) + log.Warn("pd clock is not set in storage sink, use local clock instead", + zap.String("namespace", changefeedID.Namespace), + zap.String("changefeedID", changefeedID.ID)) + } return &FilePathGenerator{ - config: config, - extension: extension, - storage: storage, - clock: clock, - fileIndex: make(map[VersionedTableName]*indexWithDate), - hasher: hash.NewPositionInertia(), - versionMap: make(map[VersionedTableName]uint64), + changefeedID: changefeedID, + config: config, + extension: extension, + storage: storage, + pdClock: pdclock, + fileIndex: make(map[VersionedTableName]*indexWithDate), + hasher: hash.NewPositionInertia(), + versionMap: make(map[VersionedTableName]uint64), } } @@ -176,8 +186,12 @@ func (f *FilePathGenerator) CheckOrWriteSchema( def.FromTableInfo(tableInfo, table.TableInfoVersion) if !def.IsTableSchema() { // only check schema for table - log.Panic("invalid table schema", zap.Any("versionedTableName", table), + log.Error("invalid table schema", + zap.String("namespace", f.changefeedID.Namespace), + zap.String("changefeedID", f.changefeedID.ID), + zap.Any("versionedTableName", table), zap.Any("tableInfo", tableInfo)) + return errors.ErrInternalCheckFailed.GenWithStackByArgs("invalid table schema in FilePathGenerator") } // Case 1: point check if the schema file exists. @@ -210,10 +224,13 @@ func (f *FilePathGenerator) CheckOrWriteSchema( } version, parsedChecksum := mustParseSchemaName(path) if parsedChecksum != checksum { - // TODO: parsedChecksum should be ignored, remove this panic - // after the new path protocol is verified. - log.Panic("invalid schema file name", + log.Error("invalid schema file name", + zap.String("namespace", f.changefeedID.Namespace), + zap.String("changefeedID", f.changefeedID.ID), zap.String("path", path), zap.Any("checksum", checksum)) + errMsg := fmt.Sprintf("invalid schema filename in storage sink, "+ + "expected checksum: %d, actual checksum: %d", checksum, parsedChecksum) + return errors.ErrInternalCheckFailed.GenWithStackByArgs(errMsg) } if version > lastVersion { lastVersion = version @@ -235,6 +252,8 @@ func (f *FilePathGenerator) CheckOrWriteSchema( // b. the schema file is deleted by the consumer. We write schema file to external storage too. if schemaFileCnt != 0 && lastVersion == 0 { log.Warn("no table schema file found in an non-empty meta path", + zap.String("namespace", f.changefeedID.Namespace), + zap.String("changefeedID", f.changefeedID.ID), zap.Any("versionedTableName", table), zap.Uint32("checksum", checksum)) } @@ -247,8 +266,8 @@ func (f *FilePathGenerator) CheckOrWriteSchema( } // SetClock is used for unit test -func (f *FilePathGenerator) SetClock(clock clock.Clock) { - f.clock = clock +func (f *FilePathGenerator) SetClock(pdClock pdutil.Clock) { + f.pdClock = pdClock } // GenerateDateStr generates a date string base on current time @@ -256,7 +275,7 @@ func (f *FilePathGenerator) SetClock(clock clock.Clock) { func (f *FilePathGenerator) GenerateDateStr() string { var dateStr string - currTime := f.clock.Now() + currTime, _ := f.pdClock.CurrentTime() switch f.config.DateSeparator { case config.DateSeparatorYear.String(): dateStr = currTime.Format("2006") diff --git a/pkg/sink/cloudstorage/path_test.go b/pkg/sink/cloudstorage/path_test.go index f8650441626..9ad360aa527 100644 --- a/pkg/sink/cloudstorage/path_test.go +++ b/pkg/sink/cloudstorage/path_test.go @@ -29,6 +29,7 @@ import ( "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/engine/pkg/clock" "github.com/pingcap/tiflow/pkg/config" + "github.com/pingcap/tiflow/pkg/pdutil" "github.com/pingcap/tiflow/pkg/util" "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/oracle" @@ -49,7 +50,7 @@ func testFilePathGenerator(ctx context.Context, t *testing.T, dir string) *FileP err = cfg.Apply(ctx, sinkURI, replicaConfig) require.NoError(t, err) - f := NewFilePathGenerator(cfg, storage, ".json", clock.New()) + f := NewFilePathGenerator(model.ChangeFeedID{}, cfg, storage, ".json", pdutil.NewMonotonicClock(clock.New())) return f } @@ -84,7 +85,7 @@ func TestGenerateDataFilePath(t *testing.T) { f = testFilePathGenerator(ctx, t, dir) f.versionMap[table] = table.TableInfoVersion f.config.DateSeparator = config.DateSeparatorYear.String() - f.clock = mockClock + f.SetClock(pdutil.NewMonotonicClock(mockClock)) mockClock.Set(time.Date(2022, 12, 31, 23, 59, 59, 0, time.UTC)) date = f.GenerateDateStr() path, err = f.GenerateDataFilePath(ctx, table, date) @@ -108,7 +109,8 @@ func TestGenerateDataFilePath(t *testing.T) { f = testFilePathGenerator(ctx, t, dir) f.versionMap[table] = table.TableInfoVersion f.config.DateSeparator = config.DateSeparatorMonth.String() - f.clock = mockClock + f.SetClock(pdutil.NewMonotonicClock(mockClock)) + mockClock.Set(time.Date(2022, 12, 31, 23, 59, 59, 0, time.UTC)) date = f.GenerateDateStr() path, err = f.GenerateDataFilePath(ctx, table, date) @@ -132,7 +134,8 @@ func TestGenerateDataFilePath(t *testing.T) { f = testFilePathGenerator(ctx, t, dir) f.versionMap[table] = table.TableInfoVersion f.config.DateSeparator = config.DateSeparatorDay.String() - f.clock = mockClock + f.SetClock(pdutil.NewMonotonicClock(mockClock)) + mockClock.Set(time.Date(2022, 12, 31, 23, 59, 59, 0, time.UTC)) date = f.GenerateDateStr() path, err = f.GenerateDataFilePath(ctx, table, date) @@ -210,7 +213,8 @@ func TestGenerateDataFilePathWithIndexFile(t *testing.T) { f := testFilePathGenerator(ctx, t, dir) mockClock := clock.NewMock() f.config.DateSeparator = config.DateSeparatorDay.String() - f.clock = mockClock + f.SetClock(pdutil.NewMonotonicClock(mockClock)) + mockClock.Set(time.Date(2023, 3, 9, 23, 59, 59, 0, time.UTC)) table := VersionedTableName{ TableNameWithPhysicTableID: model.TableName{