diff --git a/cdc/api/v2/model.go b/cdc/api/v2/model.go index 7c0e0101c1b..f23f45fb896 100644 --- a/cdc/api/v2/model.go +++ b/cdc/api/v2/model.go @@ -281,6 +281,7 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig( Terminator: c.Sink.Terminator, DateSeparator: c.Sink.DateSeparator, EnablePartitionSeparator: c.Sink.EnablePartitionSeparator, + FileIndexWidth: c.Sink.FileIndexWidth, EnableKafkaSinkV2: c.Sink.EnableKafkaSinkV2, OnlyOutputUpdatedColumns: c.Sink.OnlyOutputUpdatedColumns, } @@ -400,6 +401,7 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig { Terminator: cloned.Sink.Terminator, DateSeparator: cloned.Sink.DateSeparator, EnablePartitionSeparator: cloned.Sink.EnablePartitionSeparator, + FileIndexWidth: cloned.Sink.FileIndexWidth, EnableKafkaSinkV2: cloned.Sink.EnableKafkaSinkV2, OnlyOutputUpdatedColumns: cloned.Sink.OnlyOutputUpdatedColumns, } @@ -536,6 +538,7 @@ type Table struct { // SinkConfig represents sink config for a changefeed // This is a duplicate of config.SinkConfig type SinkConfig struct { +<<<<<<< HEAD Protocol string `json:"protocol"` SchemaRegistry string `json:"schema_registry"` CSVConfig *CSVConfig `json:"csv"` @@ -548,6 +551,25 @@ type SinkConfig struct { EnablePartitionSeparator bool `json:"enable_partition_separator"` EnableKafkaSinkV2 bool `json:"enable_kafka_sink_v_2"` OnlyOutputUpdatedColumns bool `json:"only_output_updated_columns"` +======= + Protocol string `json:"protocol"` + SchemaRegistry string `json:"schema_registry"` + CSVConfig *CSVConfig `json:"csv"` + DispatchRules []*DispatchRule `json:"dispatchers,omitempty"` + ColumnSelectors []*ColumnSelector `json:"column_selectors"` + TxnAtomicity string `json:"transaction_atomicity"` + EncoderConcurrency int `json:"encoder_concurrency"` + Terminator string `json:"terminator"` + DateSeparator string `json:"date_separator"` + EnablePartitionSeparator bool `json:"enable_partition_separator"` + FileIndexWidth int `json:"file_index_width"` + EnableKafkaSinkV2 bool `json:"enable_kafka_sink_v2"` + OnlyOutputUpdatedColumns *bool `json:"only_output_updated_columns"` + SafeMode *bool `json:"safe_mode,omitempty"` + KafkaConfig *KafkaConfig `json:"kafka_config,omitempty"` + MySQLConfig *MySQLConfig `json:"mysql_config,omitempty"` + CloudStorageConfig *CloudStorageConfig `json:"cloud_storage_config,omitempty"` +>>>>>>> 19277542cd (sink(ticdc): add index config to storage sink (#8918)) } // CSVConfig denotes the csv config diff --git a/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink_test.go b/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink_test.go index cdd00e0934e..4c01b042e2f 100644 --- a/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink_test.go +++ b/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink_test.go @@ -97,7 +97,7 @@ func TestCloudStorageWriteEventsWithoutDateSeparator(t *testing.T) { replicaConfig := config.GetDefaultReplicaConfig() replicaConfig.Sink.Protocol = config.ProtocolCsv.String() - + replicaConfig.Sink.FileIndexWidth = 6 errCh := make(chan error, 5) s, err := NewDMLSink(ctx, sinkURI, replicaConfig, errCh) require.Nil(t, err) @@ -172,6 +172,7 @@ func TestCloudStorageWriteEventsWithDateSeparator(t *testing.T) { replicaConfig := config.GetDefaultReplicaConfig() replicaConfig.Sink.Protocol = config.ProtocolCsv.String() replicaConfig.Sink.DateSeparator = config.DateSeparatorDay.String() + replicaConfig.Sink.FileIndexWidth = 6 errCh := make(chan error, 5) s, err := NewDMLSink(ctx, sinkURI, replicaConfig, errCh) diff --git a/cdc/sink/dmlsink/cloudstorage/dml_worker_test.go b/cdc/sink/dmlsink/cloudstorage/dml_worker_test.go index ca47ea853c5..250dc4b5877 100644 --- a/cdc/sink/dmlsink/cloudstorage/dml_worker_test.go +++ b/cdc/sink/dmlsink/cloudstorage/dml_worker_test.go @@ -46,6 +46,7 @@ func testDMLWorker(ctx context.Context, t *testing.T, dir string) *dmlWorker { require.Nil(t, err) cfg := cloudstorage.NewConfig() err = cfg.Apply(context.TODO(), sinkURI, config.GetDefaultReplicaConfig()) + cfg.FileIndexWidth = 6 require.Nil(t, err) statistics := metrics.NewStatistics(ctx, sink.TxnSink) diff --git a/cmd/storage-consumer/main.go b/cmd/storage-consumer/main.go index f993be666db..fe59fb24871 100644 --- a/cmd/storage-consumer/main.go +++ b/cmd/storage-consumer/main.go @@ -62,6 +62,7 @@ var ( logFile string logLevel string flushInterval time.Duration + fileIndexWidth int enableProfiling bool timezone string ) @@ -79,6 +80,8 @@ func init() { flag.StringVar(&logFile, "log-file", "", "log file path") flag.StringVar(&logLevel, "log-level", "info", "log level") flag.DurationVar(&flushInterval, "flush-interval", 10*time.Second, "flush interval") + flag.IntVar(&fileIndexWidth, "file-index-width", + config.DefaultFileIndexWidth, "file index width") flag.BoolVar(&enableProfiling, "enable-profiling", false, "whether to enable profiling") flag.StringVar(&timezone, "tz", "System", "Specify time zone of storage consumer") flag.Parse() @@ -393,7 +396,7 @@ func (c *consumer) syncExecDMLEvents( key cloudstorage.DmlPathKey, fileIdx uint64, ) error { - filePath := key.GenerateDMLFilePath(fileIdx, c.fileExtension) + filePath := key.GenerateDMLFilePath(fileIdx, c.fileExtension, fileIndexWidth) log.Debug("read from dml file path", zap.String("path", filePath)) content, err := c.externalStorage.ReadFile(ctx, filePath) if err != nil { diff --git a/docs/swagger/docs.go b/docs/swagger/docs.go index 4cbf7fd655e..3a2d1610efa 100644 --- a/docs/swagger/docs.go +++ b/docs/swagger/docs.go @@ -1391,6 +1391,18 @@ var doc = `{ "encoder-concurrency": { "type": "integer" }, +<<<<<<< HEAD +======= + "file-index-digit": { + "type": "integer" + }, + "kafka-config": { + "$ref": "#/definitions/config.KafkaConfig" + }, + "mysql-config": { + "$ref": "#/definitions/config.MySQLConfig" + }, +>>>>>>> 19277542cd (sink(ticdc): add index config to storage sink (#8918)) "only-output-updated-columns": { "type": "boolean" }, @@ -2220,6 +2232,18 @@ var doc = `{ "encoder_concurrency": { "type": "integer" }, +<<<<<<< HEAD +======= + "file_index_width": { + "type": "integer" + }, + "kafka_config": { + "$ref": "#/definitions/v2.KafkaConfig" + }, + "mysql_config": { + "$ref": "#/definitions/v2.MySQLConfig" + }, +>>>>>>> 19277542cd (sink(ticdc): add index config to storage sink (#8918)) "only_output_updated_columns": { "type": "boolean" }, diff --git a/docs/swagger/swagger.json b/docs/swagger/swagger.json index 4cb597f0fbe..ad11bd25928 100644 --- a/docs/swagger/swagger.json +++ b/docs/swagger/swagger.json @@ -1372,6 +1372,18 @@ "encoder-concurrency": { "type": "integer" }, +<<<<<<< HEAD +======= + "file-index-digit": { + "type": "integer" + }, + "kafka-config": { + "$ref": "#/definitions/config.KafkaConfig" + }, + "mysql-config": { + "$ref": "#/definitions/config.MySQLConfig" + }, +>>>>>>> 19277542cd (sink(ticdc): add index config to storage sink (#8918)) "only-output-updated-columns": { "type": "boolean" }, @@ -2201,6 +2213,18 @@ "encoder_concurrency": { "type": "integer" }, +<<<<<<< HEAD +======= + "file_index_width": { + "type": "integer" + }, + "kafka_config": { + "$ref": "#/definitions/v2.KafkaConfig" + }, + "mysql_config": { + "$ref": "#/definitions/v2.MySQLConfig" + }, +>>>>>>> 19277542cd (sink(ticdc): add index config to storage sink (#8918)) "only_output_updated_columns": { "type": "boolean" }, diff --git a/docs/swagger/swagger.yaml b/docs/swagger/swagger.yaml index d4a948bf7e0..e7be08fe72e 100644 --- a/docs/swagger/swagger.yaml +++ b/docs/swagger/swagger.yaml @@ -63,6 +63,15 @@ definitions: type: boolean encoder-concurrency: type: integer +<<<<<<< HEAD +======= + file-index-digit: + type: integer + kafka-config: + $ref: '#/definitions/config.KafkaConfig' + mysql-config: + $ref: '#/definitions/config.MySQLConfig' +>>>>>>> 19277542cd (sink(ticdc): add index config to storage sink (#8918)) only-output-updated-columns: type: boolean protocol: @@ -619,6 +628,15 @@ definitions: type: boolean encoder_concurrency: type: integer +<<<<<<< HEAD +======= + file_index_width: + type: integer + kafka_config: + $ref: '#/definitions/v2.KafkaConfig' + mysql_config: + $ref: '#/definitions/v2.MySQLConfig' +>>>>>>> 19277542cd (sink(ticdc): add index config to storage sink (#8918)) only_output_updated_columns: type: boolean protocol: diff --git a/pkg/cmd/util/helper_test.go b/pkg/cmd/util/helper_test.go index 50d9395ee28..eb02bc951a7 100644 --- a/pkg/cmd/util/helper_test.go +++ b/pkg/cmd/util/helper_test.go @@ -218,6 +218,7 @@ func TestAndWriteStorageSinkTOML(t *testing.T) { Terminator: "\r\n", DateSeparator: "day", EnablePartitionSeparator: true, + FileIndexWidth: config.DefaultFileIndexWidth, CSVConfig: &config.CSVConfig{ Delimiter: ",", Quote: "\"", diff --git a/pkg/config/sink.go b/pkg/config/sink.go index 9627878b255..f02aca261b9 100644 --- a/pkg/config/sink.go +++ b/pkg/config/sink.go @@ -25,15 +25,14 @@ import ( "go.uber.org/zap" ) -// DefaultMaxMessageBytes sets the default value for max-message-bytes. -const DefaultMaxMessageBytes = 10 * 1024 * 1024 // 10M - const ( + // DefaultMaxMessageBytes sets the default value for max-message-bytes. + DefaultMaxMessageBytes = 10 * 1024 * 1024 // 10M + // TxnAtomicityKey specifies the key of the transaction-atomicity in the SinkURI. TxnAtomicityKey = "transaction-atomicity" // defaultTxnAtomicity is the default atomicity level. defaultTxnAtomicity = noneTxnAtomicity - // unknownTxnAtomicity is an invalid atomicity level and will be treated as // defaultTxnAtomicity when initializing sink in processor. unknownTxnAtomicity AtomicityLevel = "" @@ -41,9 +40,7 @@ const ( noneTxnAtomicity AtomicityLevel = "none" // tableTxnAtomicity means atomicity of single table transactions is guaranteed. tableTxnAtomicity AtomicityLevel = "table" -) -const ( // Comma is a constant for ',' Comma = "," // CR is an abbreviation for carriage return @@ -58,6 +55,13 @@ const ( Backslash = '\\' // NULL is a constant for '\N' NULL = "\\N" + + // MinFileIndexWidth is the minimum width of file index. + MinFileIndexWidth = 6 // enough for 2^19 files + // MaxFileIndexWidth is the maximum width of file index. + MaxFileIndexWidth = 20 // enough for 2^64 files + // DefaultFileIndexWidth is the default width of file index. + DefaultFileIndexWidth = MaxFileIndexWidth ) // AtomicityLevel represents the atomicity level of a changefeed. @@ -109,6 +113,7 @@ type SinkConfig struct { Terminator string `toml:"terminator" json:"terminator"` DateSeparator string `toml:"date-separator" json:"date-separator"` EnablePartitionSeparator bool `toml:"enable-partition-separator" json:"enable-partition-separator"` + FileIndexWidth int `toml:"file-index-digit,omitempty" json:"file-index-digit,omitempty"` // EnableKafkaSinkV2 enabled then the kafka-go sink will be used. EnableKafkaSinkV2 bool `toml:"enable-kafka-sink-v2" json:"enable-kafka-sink-v2"` @@ -133,6 +138,42 @@ type CSVConfig struct { IncludeCommitTs bool `toml:"include-commit-ts" json:"include-commit-ts"` } +func (c *CSVConfig) validateAndAdjust() error { + if c == nil { + return nil + } + + // validate quote + if len(c.Quote) > 1 { + return cerror.WrapError(cerror.ErrSinkInvalidConfig, + errors.New("csv config quote contains more than one character")) + } + if len(c.Quote) == 1 { + quote := c.Quote[0] + if quote == CR || quote == LF { + return cerror.WrapError(cerror.ErrSinkInvalidConfig, + errors.New("csv config quote cannot be line break character")) + } + } + + // validate delimiter + if len(c.Delimiter) == 0 { + return cerror.WrapError(cerror.ErrSinkInvalidConfig, + errors.New("csv config delimiter cannot be empty")) + } + if strings.ContainsRune(c.Delimiter, CR) || + strings.ContainsRune(c.Delimiter, LF) { + return cerror.WrapError(cerror.ErrSinkInvalidConfig, + errors.New("csv config delimiter contains line break characters")) + } + if len(c.Quote) > 0 && strings.Contains(c.Delimiter, c.Quote) { + return cerror.WrapError(cerror.ErrSinkInvalidConfig, + errors.New("csv config quote and delimiter cannot be the same")) + } + + return nil +} + // DateSeparator specifies the date separator in storage destination path type DateSeparator int @@ -235,48 +276,27 @@ func (s *SinkConfig) validateAndAdjust(sinkURI *url.URL, enableOldValue bool) er s.Terminator = CRLF } - // validate date separator - if len(s.DateSeparator) > 0 { - var separator DateSeparator - if err := separator.FromString(s.DateSeparator); err != nil { - return cerror.WrapError(cerror.ErrSinkInvalidConfig, err) + // validate storage sink related config + if sinkURI != nil && sink.IsStorageScheme(sinkURI.Scheme) { + // validate date separator + if len(s.DateSeparator) > 0 { + var separator DateSeparator + if err := separator.FromString(s.DateSeparator); err != nil { + return cerror.WrapError(cerror.ErrSinkInvalidConfig, err) + } } - } - - if s.CSVConfig != nil { - return s.validateAndAdjustCSVConfig() - } - return nil -} - -func (s *SinkConfig) validateAndAdjustCSVConfig() error { - // validate quote - if len(s.CSVConfig.Quote) > 1 { - return cerror.WrapError(cerror.ErrSinkInvalidConfig, - errors.New("csv config quote contains more than one character")) - } - if len(s.CSVConfig.Quote) == 1 { - quote := s.CSVConfig.Quote[0] - if quote == CR || quote == LF { - return cerror.WrapError(cerror.ErrSinkInvalidConfig, - errors.New("csv config quote cannot be line break character")) + // File index width should be in [minFileIndexWidth, maxFileIndexWidth]. + // In most scenarios, the user does not need to change this configuration, + // so the default value of this parameter is not set and just make silent + // adjustments here. + if s.FileIndexWidth < MinFileIndexWidth || s.FileIndexWidth > MaxFileIndexWidth { + s.FileIndexWidth = DefaultFileIndexWidth } - } - // validate delimiter - if len(s.CSVConfig.Delimiter) == 0 { - return cerror.WrapError(cerror.ErrSinkInvalidConfig, - errors.New("csv config delimiter cannot be empty")) - } - if strings.ContainsRune(s.CSVConfig.Delimiter, CR) || - strings.ContainsRune(s.CSVConfig.Delimiter, LF) { - return cerror.WrapError(cerror.ErrSinkInvalidConfig, - errors.New("csv config delimiter contains line break characters")) - } - if len(s.CSVConfig.Quote) > 0 && strings.Contains(s.CSVConfig.Delimiter, s.CSVConfig.Quote) { - return cerror.WrapError(cerror.ErrSinkInvalidConfig, - errors.New("csv config quote and delimiter cannot be the same")) + if err := s.CSVConfig.validateAndAdjust(); err != nil { + return err + } } return nil diff --git a/pkg/config/sink_test.go b/pkg/config/sink_test.go index d377fcdebbf..af7051e74bc 100644 --- a/pkg/config/sink_test.go +++ b/pkg/config/sink_test.go @@ -260,7 +260,6 @@ func TestApplyParameterBySinkURI(t *testing.T) { } else { require.ErrorContains(t, err, tc.expectedErr) } - } } @@ -333,6 +332,7 @@ func TestCheckCompatibilityWithSinkURI(t *testing.T) { } func TestValidateAndAdjustCSVConfig(t *testing.T) { + t.Parallel() tests := []struct { name string config *CSVConfig @@ -393,16 +393,38 @@ func TestValidateAndAdjustCSVConfig(t *testing.T) { wantErr: "csv config quote and delimiter cannot be the same", }, } - for _, tc := range tests { + for _, c := range tests { + tc := c t.Run(tc.name, func(t *testing.T) { + t.Parallel() s := &SinkConfig{ CSVConfig: tc.config, } if tc.wantErr == "" { - require.Nil(t, s.validateAndAdjustCSVConfig()) + require.Nil(t, s.CSVConfig.validateAndAdjust()) } else { - require.Regexp(t, tc.wantErr, s.validateAndAdjustCSVConfig()) + require.Regexp(t, tc.wantErr, s.CSVConfig.validateAndAdjust()) } }) } } + +func TestValidateAndAdjustStorageConfig(t *testing.T) { + t.Parallel() + + sinkURI, err := url.Parse("s3://bucket?protocol=csv") + require.NoError(t, err) + s := GetDefaultReplicaConfig() + err = s.ValidateAndAdjust(sinkURI) + require.NoError(t, err) + require.Equal(t, DefaultFileIndexWidth, s.Sink.FileIndexWidth) + + err = s.ValidateAndAdjust(sinkURI) + require.NoError(t, err) + require.Equal(t, DefaultFileIndexWidth, s.Sink.FileIndexWidth) + + s.Sink.FileIndexWidth = 16 + err = s.ValidateAndAdjust(sinkURI) + require.NoError(t, err) + require.Equal(t, 16, s.Sink.FileIndexWidth) +} diff --git a/pkg/sink/cloudstorage/config.go b/pkg/sink/cloudstorage/config.go index e566a8d7eb1..5c191336af0 100644 --- a/pkg/sink/cloudstorage/config.go +++ b/pkg/sink/cloudstorage/config.go @@ -52,6 +52,7 @@ type Config struct { WorkerCount int FlushInterval time.Duration FileSize int + FileIndexWidth int DateSeparator string EnablePartitionSeparator bool } @@ -96,6 +97,7 @@ func (c *Config) Apply( c.DateSeparator = replicaConfig.Sink.DateSeparator c.EnablePartitionSeparator = replicaConfig.Sink.EnablePartitionSeparator + c.FileIndexWidth = replicaConfig.Sink.FileIndexWidth return nil } diff --git a/pkg/sink/cloudstorage/config_test.go b/pkg/sink/cloudstorage/config_test.go index 36af97ad1b0..faa38d106f9 100644 --- a/pkg/sink/cloudstorage/config_test.go +++ b/pkg/sink/cloudstorage/config_test.go @@ -28,13 +28,18 @@ func TestConfigApply(t *testing.T) { expected.WorkerCount = 32 expected.FlushInterval = 10 * time.Second expected.FileSize = 16 * 1024 * 1024 + expected.FileIndexWidth = config.DefaultFileIndexWidth expected.DateSeparator = config.DateSeparatorNone.String() expected.EnablePartitionSeparator = true - uri := "s3://bucket/prefix?worker-count=32&flush-interval=10s&file-size=16777216" + uri := "s3://bucket/prefix?worker-count=32&flush-interval=10s&file-size=16777216&protocol=csv" sinkURI, err := url.Parse(uri) require.Nil(t, err) + + replicaConfig := config.GetDefaultReplicaConfig() + err = replicaConfig.ValidateAndAdjust(sinkURI) + require.NoError(t, err) cfg := NewConfig() - err = cfg.Apply(context.TODO(), sinkURI, config.GetDefaultReplicaConfig()) + err = cfg.Apply(context.TODO(), sinkURI, replicaConfig) require.Nil(t, err) require.Equal(t, expected, cfg) } diff --git a/pkg/sink/cloudstorage/path.go b/pkg/sink/cloudstorage/path.go index ed5100202a0..9dd6a818fb7 100644 --- a/pkg/sink/cloudstorage/path.go +++ b/pkg/sink/cloudstorage/path.go @@ -34,7 +34,7 @@ import ( const ( // 3 is the length of "CDC", and the file number contains // at least 6 digits (e.g. CDC000001.csv). - minFileNamePrefixLen = 9 + minFileNamePrefixLen = 3 + config.MinFileIndexWidth defaultIndexFileName = "CDC.index" // The following constants are used to generate file paths. @@ -54,8 +54,8 @@ func IsSchemaFile(path string) bool { return schemaRE.MatchString(path) } -// MustParseSchemaName parses the version from the schema file name. -func MustParseSchemaName(path string) (uint64, uint32) { +// mustParseSchemaName parses the version from the schema file name. +func mustParseSchemaName(path string) (uint64, uint32) { reportErr := func(err error) { log.Panic("failed to parse schema file name", zap.String("schemaPath", path), @@ -99,6 +99,11 @@ func generateSchemaFilePath( schema, table, tableVersion, checksum) } +func generateDataFileName(index uint64, extension string, fileIndexWidth int) string { + indexFmt := "%0" + strconv.Itoa(fileIndexWidth) + "d" + return fmt.Sprintf("CDC"+indexFmt+"%s", index, extension) +} + type indexWithDate struct { index uint64 currDate, prevDate string @@ -180,7 +185,7 @@ func (f *FilePathGenerator) CheckOrWriteSchema( } // walk the table meta path to find the last schema file - _, checksum := MustParseSchemaName(tblSchemaFile) + _, checksum := mustParseSchemaName(tblSchemaFile) schemaFileCnt := 0 lastVersion := uint64(0) prefix := fmt.Sprintf(tableSchemaPrefix+"schema_", def.Schema, def.Table) @@ -191,7 +196,7 @@ func (f *FilePathGenerator) CheckOrWriteSchema( if !strings.HasSuffix(path, checksumSuffix) { return nil } - version, parsedChecksum := MustParseSchemaName(path) + version, parsedChecksum := mustParseSchemaName(path) if parsedChecksum != checksum { // TODO: parsedChecksum should be ignored, remove this panic // after the new path protocol is verified. @@ -254,6 +259,25 @@ func (f *FilePathGenerator) GenerateDateStr() string { return dateStr } +// GenerateIndexFilePath generates a canonical path for index file. +func (f *FilePathGenerator) GenerateIndexFilePath(tbl VersionedTableName, date string) string { + dir := f.generateDataDirPath(tbl, date) + name := defaultIndexFileName + return strings.Join([]string{dir, name}, "/") +} + +// GenerateDataFilePath generates a canonical path for data file. +func (f *FilePathGenerator) GenerateDataFilePath( + ctx context.Context, tbl VersionedTableName, date string, +) (string, error) { + dir := f.generateDataDirPath(tbl, date) + name, err := f.generateDataFileName(ctx, tbl, date) + if err != nil { + return "", err + } + return strings.Join([]string{dir, name}, "/"), nil +} + func (f *FilePathGenerator) generateDataDirPath(tbl VersionedTableName, date string) string { var elems []string @@ -272,34 +296,9 @@ func (f *FilePathGenerator) generateDataDirPath(tbl VersionedTableName, date str return strings.Join(elems, "/") } -func (f *FilePathGenerator) fetchIndexFromFileName(fileName string) (uint64, error) { - var fileIdx uint64 - var err error - - if len(fileName) < minFileNamePrefixLen+len(f.extension) || - !strings.HasPrefix(fileName, "CDC") || - !strings.HasSuffix(fileName, f.extension) { - return 0, errors.WrapError(errors.ErrStorageSinkInvalidFileName, - fmt.Errorf("'%s' is a invalid file name", fileName)) - } - - extIdx := strings.Index(fileName, f.extension) - fileIdxStr := fileName[3:extIdx] - if fileIdx, err = strconv.ParseUint(fileIdxStr, 10, 64); err != nil { - return 0, errors.WrapError(errors.ErrStorageSinkInvalidFileName, err) - } - - return fileIdx, nil -} - -// GenerateDataFilePath generates a canonical path for data file. -func (f *FilePathGenerator) GenerateDataFilePath( - ctx context.Context, - tbl VersionedTableName, - date string, +func (f *FilePathGenerator) generateDataFileName( + ctx context.Context, tbl VersionedTableName, date string, ) (string, error) { - var elems []string - elems = append(elems, f.generateDataDirPath(tbl, date)) if idx, ok := f.fileIndex[tbl]; !ok { fileIdx, err := f.getNextFileIdxFromIndexFile(ctx, tbl, date) if err != nil { @@ -320,19 +319,7 @@ func (f *FilePathGenerator) GenerateDataFilePath( f.fileIndex[tbl].index = 0 } f.fileIndex[tbl].index++ - elems = append(elems, fmt.Sprintf("CDC%06d%s", f.fileIndex[tbl].index, f.extension)) - - return strings.Join(elems, "/"), nil -} - -// GenerateIndexFilePath generates a canonical path for index file. -func (f *FilePathGenerator) GenerateIndexFilePath(tbl VersionedTableName, date string) string { - var elems []string - - elems = append(elems, f.generateDataDirPath(tbl, date)) - elems = append(elems, defaultIndexFileName) - - return strings.Join(elems, "/") + return generateDataFileName(f.fileIndex[tbl].index, f.extension, f.config.FileIndexWidth), nil } func (f *FilePathGenerator) getNextFileIdxFromIndexFile( @@ -358,8 +345,8 @@ func (f *FilePathGenerator) getNextFileIdxFromIndexFile( } lastFilePath := strings.Join([]string{ - f.generateDataDirPath(tbl, date), // file dir - fmt.Sprintf("CDC%06d%s", maxFileIdx, f.extension), // file name + f.generateDataDirPath(tbl, date), // file dir + generateDataFileName(maxFileIdx, f.extension, f.config.FileIndexWidth), // file name }, "/") var lastFileExists, lastFileIsEmpty bool @@ -392,3 +379,23 @@ func (f *FilePathGenerator) getNextFileIdxFromIndexFile( } return fileIdx, nil } + +func (f *FilePathGenerator) fetchIndexFromFileName(fileName string) (uint64, error) { + var fileIdx uint64 + var err error + + if len(fileName) < minFileNamePrefixLen+len(f.extension) || + !strings.HasPrefix(fileName, "CDC") || + !strings.HasSuffix(fileName, f.extension) { + return 0, errors.WrapError(errors.ErrStorageSinkInvalidFileName, + fmt.Errorf("'%s' is a invalid file name", fileName)) + } + + extIdx := strings.Index(fileName, f.extension) + fileIdxStr := fileName[3:extIdx] + if fileIdx, err = strconv.ParseUint(fileIdxStr, 10, 64); err != nil { + return 0, errors.WrapError(errors.ErrStorageSinkInvalidFileName, err) + } + + return fileIdx, nil +} diff --git a/pkg/sink/cloudstorage/path_key.go b/pkg/sink/cloudstorage/path_key.go index f8575c8cde6..e18636f1997 100644 --- a/pkg/sink/cloudstorage/path_key.go +++ b/pkg/sink/cloudstorage/path_key.go @@ -58,7 +58,7 @@ func (s *SchemaPathKey) ParseSchemaFilePath(path string) (uint32, error) { } schemaFileName := matches[len(matches)-1] - version, checksum := MustParseSchemaName(schemaFileName) + version, checksum := mustParseSchemaName(schemaFileName) *s = SchemaPathKey{ Schema: schema, @@ -76,7 +76,9 @@ type DmlPathKey struct { } // GenerateDMLFilePath generates the dml file path. -func (d *DmlPathKey) GenerateDMLFilePath(idx uint64, extension string) string { +func (d *DmlPathKey) GenerateDMLFilePath( + idx uint64, extension string, fileIndexWidth int, +) string { var elems []string elems = append(elems, d.Schema) @@ -89,7 +91,7 @@ func (d *DmlPathKey) GenerateDMLFilePath(idx uint64, extension string) string { if len(d.Date) != 0 { elems = append(elems, d.Date) } - elems = append(elems, fmt.Sprintf("CDC%06d%s", idx, extension)) + elems = append(elems, generateDataFileName(idx, extension, fileIndexWidth)) return strings.Join(elems, "/") } diff --git a/pkg/sink/cloudstorage/path_key_test.go b/pkg/sink/cloudstorage/path_key_test.go index da6bb55cecc..a78e91d1edd 100644 --- a/pkg/sink/cloudstorage/path_key_test.go +++ b/pkg/sink/cloudstorage/path_key_test.go @@ -56,3 +56,41 @@ func TestSchemaPathKey(t *testing.T) { require.Equal(t, tc.checksum, checksum) } } + +func TestDmlPathKey(t *testing.T) { + t.Parallel() + + testCases := []struct { + index int + fileIndexWidth int + extension string + path string + dmlkey DmlPathKey + }{ + { + index: 10, + fileIndexWidth: 20, + extension: ".csv", + path: "schema1/table1/123456/2023-05-09/CDC00000000000000000010.csv", + dmlkey: DmlPathKey{ + SchemaPathKey: SchemaPathKey{ + Schema: "schema1", + Table: "table1", + TableVersion: 123456, + }, + PartitionNum: 0, + Date: "2023-05-09", + }, + }, + } + + for _, tc := range testCases { + var dmlkey DmlPathKey + idx, err := dmlkey.ParseDMLFilePath("day", tc.path) + require.NoError(t, err) + require.Equal(t, tc.dmlkey, dmlkey) + + fileName := dmlkey.GenerateDMLFilePath(idx, tc.extension, tc.fileIndexWidth) + require.Equal(t, tc.path, fileName) + } +} diff --git a/pkg/sink/cloudstorage/path_test.go b/pkg/sink/cloudstorage/path_test.go index 72076fa6284..2f0b2673ba5 100644 --- a/pkg/sink/cloudstorage/path_test.go +++ b/pkg/sink/cloudstorage/path_test.go @@ -36,6 +36,7 @@ func testFilePathGenerator(ctx context.Context, t *testing.T, dir string) *FileP require.NoError(t, err) replicaConfig := config.GetDefaultReplicaConfig() replicaConfig.Sink.Protocol = config.ProtocolOpen.String() + replicaConfig.Sink.FileIndexWidth = 6 cfg := NewConfig() err = cfg.Apply(ctx, sinkURI, replicaConfig) require.NoError(t, err)