From 82397e1aef652b183cb2154ea046399d904058ea Mon Sep 17 00:00:00 2001 From: CharlesCheung <61726649+CharlesCheung96@users.noreply.github.com> Date: Wed, 10 May 2023 11:12:06 +0800 Subject: [PATCH] This is an automated cherry-pick of #8918 Signed-off-by: ti-chi-bot --- cdc/api/v2/model.go | 40 ++++ .../cloud_storage_dml_sink_test.go | 6 + .../eventsink/cloudstorage/dml_worker_test.go | 1 + cmd/storage-consumer/main.go | 7 + docs/swagger/docs.go | 30 +++ docs/swagger/swagger.json | 30 +++ docs/swagger/swagger.yaml | 22 ++ pkg/cmd/util/helper_test.go | 1 + pkg/config/sink.go | 117 ++++++---- pkg/config/sink_test.go | 30 ++- pkg/sink/cloudstorage/config.go | 2 + pkg/sink/cloudstorage/config_test.go | 9 +- pkg/sink/cloudstorage/path.go | 211 ++++++++++++++++-- pkg/sink/cloudstorage/path_key.go | 156 +++++++++++++ pkg/sink/cloudstorage/path_key_test.go | 96 ++++++++ pkg/sink/cloudstorage/path_test.go | 1 + 16 files changed, 691 insertions(+), 68 deletions(-) create mode 100644 pkg/sink/cloudstorage/path_key.go create mode 100644 pkg/sink/cloudstorage/path_key_test.go diff --git a/cdc/api/v2/model.go b/cdc/api/v2/model.go index b745e793940..20175b3d5a7 100644 --- a/cdc/api/v2/model.go +++ b/cdc/api/v2/model.go @@ -279,6 +279,16 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig( Terminator: c.Sink.Terminator, DateSeparator: c.Sink.DateSeparator, EnablePartitionSeparator: c.Sink.EnablePartitionSeparator, +<<<<<<< HEAD +======= + FileIndexWidth: c.Sink.FileIndexWidth, + EnableKafkaSinkV2: c.Sink.EnableKafkaSinkV2, + OnlyOutputUpdatedColumns: c.Sink.OnlyOutputUpdatedColumns, + KafkaConfig: kafkaConfig, + MySQLConfig: mysqlConfig, + CloudStorageConfig: cloudStorageConfig, + SafeMode: c.Sink.SafeMode, +>>>>>>> 19277542cd (sink(ticdc): add index config to storage sink (#8918)) } } if c.Mounter != nil { @@ -383,6 +393,16 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig { Terminator: cloned.Sink.Terminator, DateSeparator: cloned.Sink.DateSeparator, EnablePartitionSeparator: cloned.Sink.EnablePartitionSeparator, +<<<<<<< HEAD +======= + FileIndexWidth: cloned.Sink.FileIndexWidth, + EnableKafkaSinkV2: cloned.Sink.EnableKafkaSinkV2, + OnlyOutputUpdatedColumns: cloned.Sink.OnlyOutputUpdatedColumns, + KafkaConfig: kafkaConfig, + MySQLConfig: mysqlConfig, + CloudStorageConfig: cloudStorageConfig, + SafeMode: cloned.Sink.SafeMode, +>>>>>>> 19277542cd (sink(ticdc): add index config to storage sink (#8918)) } } if cloned.Consistent != nil { @@ -502,6 +522,7 @@ type Table struct { // SinkConfig represents sink config for a changefeed // This is a duplicate of config.SinkConfig type SinkConfig struct { +<<<<<<< HEAD Protocol string `json:"protocol"` SchemaRegistry string `json:"schema_registry"` CSVConfig *CSVConfig `json:"csv"` @@ -512,6 +533,25 @@ type SinkConfig struct { Terminator string `json:"terminator"` DateSeparator string `json:"date_separator"` EnablePartitionSeparator bool `json:"enable_partition_separator"` +======= + Protocol string `json:"protocol"` + SchemaRegistry string `json:"schema_registry"` + CSVConfig *CSVConfig `json:"csv"` + DispatchRules []*DispatchRule `json:"dispatchers,omitempty"` + ColumnSelectors []*ColumnSelector `json:"column_selectors"` + TxnAtomicity string `json:"transaction_atomicity"` + EncoderConcurrency int `json:"encoder_concurrency"` + Terminator string `json:"terminator"` + DateSeparator string `json:"date_separator"` + EnablePartitionSeparator bool `json:"enable_partition_separator"` + FileIndexWidth int `json:"file_index_width"` + EnableKafkaSinkV2 bool `json:"enable_kafka_sink_v2"` + OnlyOutputUpdatedColumns *bool `json:"only_output_updated_columns"` + SafeMode *bool `json:"safe_mode,omitempty"` + KafkaConfig *KafkaConfig `json:"kafka_config,omitempty"` + MySQLConfig *MySQLConfig `json:"mysql_config,omitempty"` + CloudStorageConfig *CloudStorageConfig `json:"cloud_storage_config,omitempty"` +>>>>>>> 19277542cd (sink(ticdc): add index config to storage sink (#8918)) } // CSVConfig denotes the csv config diff --git a/cdc/sinkv2/eventsink/cloudstorage/cloud_storage_dml_sink_test.go b/cdc/sinkv2/eventsink/cloudstorage/cloud_storage_dml_sink_test.go index 42ec152d308..4ff212451e1 100644 --- a/cdc/sinkv2/eventsink/cloudstorage/cloud_storage_dml_sink_test.go +++ b/cdc/sinkv2/eventsink/cloudstorage/cloud_storage_dml_sink_test.go @@ -91,8 +91,13 @@ func TestCloudStorageWriteEventsWithoutDateSeparator(t *testing.T) { require.Nil(t, err) replicaConfig := config.GetDefaultReplicaConfig() +<<<<<<< HEAD:cdc/sinkv2/eventsink/cloudstorage/cloud_storage_dml_sink_test.go replicaConfig.Sink.Protocol = config.ProtocolOpen.String() +======= + replicaConfig.Sink.Protocol = config.ProtocolCsv.String() + replicaConfig.Sink.FileIndexWidth = 6 +>>>>>>> 19277542cd (sink(ticdc): add index config to storage sink (#8918)):cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink_test.go errCh := make(chan error, 5) s, err := NewCloudStorageSink(ctx, sinkURI, replicaConfig, errCh) require.Nil(t, err) @@ -163,6 +168,7 @@ func TestCloudStorageWriteEventsWithDateSeparator(t *testing.T) { replicaConfig := config.GetDefaultReplicaConfig() replicaConfig.Sink.Protocol = config.ProtocolOpen.String() replicaConfig.Sink.DateSeparator = config.DateSeparatorDay.String() + replicaConfig.Sink.FileIndexWidth = 6 errCh := make(chan error, 5) s, err := NewCloudStorageSink(ctx, sinkURI, replicaConfig, errCh) diff --git a/cdc/sinkv2/eventsink/cloudstorage/dml_worker_test.go b/cdc/sinkv2/eventsink/cloudstorage/dml_worker_test.go index 6f1337d04e5..1f27674b504 100644 --- a/cdc/sinkv2/eventsink/cloudstorage/dml_worker_test.go +++ b/cdc/sinkv2/eventsink/cloudstorage/dml_worker_test.go @@ -46,6 +46,7 @@ func testDMLWorker(ctx context.Context, t *testing.T, dir string) *dmlWorker { require.Nil(t, err) cfg := cloudstorage.NewConfig() err = cfg.Apply(context.TODO(), sinkURI, config.GetDefaultReplicaConfig()) + cfg.FileIndexWidth = 6 require.Nil(t, err) statistics := metrics.NewStatistics(ctx, sink.TxnSink) diff --git a/cmd/storage-consumer/main.go b/cmd/storage-consumer/main.go index 75657f21891..bf4fefddb95 100644 --- a/cmd/storage-consumer/main.go +++ b/cmd/storage-consumer/main.go @@ -63,6 +63,7 @@ var ( logFile string logLevel string flushInterval time.Duration + fileIndexWidth int enableProfiling bool timezone string ) @@ -80,6 +81,8 @@ func init() { flag.StringVar(&logFile, "log-file", "", "log file path") flag.StringVar(&logLevel, "log-level", "info", "log level") flag.DurationVar(&flushInterval, "flush-interval", 10*time.Second, "flush interval") + flag.IntVar(&fileIndexWidth, "file-index-width", + config.DefaultFileIndexWidth, "file index width") flag.BoolVar(&enableProfiling, "enable-profiling", false, "whether to enable profiling") flag.StringVar(&timezone, "tz", "System", "Specify time zone of storage consumer") flag.Parse() @@ -532,7 +535,11 @@ func (c *consumer) syncExecDMLEvents( key dmlPathKey, fileIdx uint64, ) error { +<<<<<<< HEAD filePath := key.generateDMLFilePath(fileIdx, c.fileExtension) +======= + filePath := key.GenerateDMLFilePath(fileIdx, c.fileExtension, fileIndexWidth) +>>>>>>> 19277542cd (sink(ticdc): add index config to storage sink (#8918)) log.Debug("read from dml file path", zap.String("path", filePath)) content, err := c.externalStorage.ReadFile(ctx, filePath) if err != nil { diff --git a/docs/swagger/docs.go b/docs/swagger/docs.go index 94c395e9250..3400bb82275 100644 --- a/docs/swagger/docs.go +++ b/docs/swagger/docs.go @@ -1271,6 +1271,21 @@ var doc = `{ "encoder-concurrency": { "type": "integer" }, +<<<<<<< HEAD +======= + "file-index-digit": { + "type": "integer" + }, + "kafka-config": { + "$ref": "#/definitions/config.KafkaConfig" + }, + "mysql-config": { + "$ref": "#/definitions/config.MySQLConfig" + }, + "only-output-updated-columns": { + "type": "boolean" + }, +>>>>>>> 19277542cd (sink(ticdc): add index config to storage sink (#8918)) "protocol": { "type": "string" }, @@ -1976,6 +1991,21 @@ var doc = `{ "encoder_concurrency": { "type": "integer" }, +<<<<<<< HEAD +======= + "file_index_width": { + "type": "integer" + }, + "kafka_config": { + "$ref": "#/definitions/v2.KafkaConfig" + }, + "mysql_config": { + "$ref": "#/definitions/v2.MySQLConfig" + }, + "only_output_updated_columns": { + "type": "boolean" + }, +>>>>>>> 19277542cd (sink(ticdc): add index config to storage sink (#8918)) "protocol": { "type": "string" }, diff --git a/docs/swagger/swagger.json b/docs/swagger/swagger.json index 03d0461e159..62c2d41dd25 100644 --- a/docs/swagger/swagger.json +++ b/docs/swagger/swagger.json @@ -1252,6 +1252,21 @@ "encoder-concurrency": { "type": "integer" }, +<<<<<<< HEAD +======= + "file-index-digit": { + "type": "integer" + }, + "kafka-config": { + "$ref": "#/definitions/config.KafkaConfig" + }, + "mysql-config": { + "$ref": "#/definitions/config.MySQLConfig" + }, + "only-output-updated-columns": { + "type": "boolean" + }, +>>>>>>> 19277542cd (sink(ticdc): add index config to storage sink (#8918)) "protocol": { "type": "string" }, @@ -1957,6 +1972,21 @@ "encoder_concurrency": { "type": "integer" }, +<<<<<<< HEAD +======= + "file_index_width": { + "type": "integer" + }, + "kafka_config": { + "$ref": "#/definitions/v2.KafkaConfig" + }, + "mysql_config": { + "$ref": "#/definitions/v2.MySQLConfig" + }, + "only_output_updated_columns": { + "type": "boolean" + }, +>>>>>>> 19277542cd (sink(ticdc): add index config to storage sink (#8918)) "protocol": { "type": "string" }, diff --git a/docs/swagger/swagger.yaml b/docs/swagger/swagger.yaml index e515e7e7147..91fcff85b29 100644 --- a/docs/swagger/swagger.yaml +++ b/docs/swagger/swagger.yaml @@ -60,6 +60,17 @@ definitions: type: boolean encoder-concurrency: type: integer +<<<<<<< HEAD +======= + file-index-digit: + type: integer + kafka-config: + $ref: '#/definitions/config.KafkaConfig' + mysql-config: + $ref: '#/definitions/config.MySQLConfig' + only-output-updated-columns: + type: boolean +>>>>>>> 19277542cd (sink(ticdc): add index config to storage sink (#8918)) protocol: type: string schema-registry: @@ -530,6 +541,17 @@ definitions: type: boolean encoder_concurrency: type: integer +<<<<<<< HEAD +======= + file_index_width: + type: integer + kafka_config: + $ref: '#/definitions/v2.KafkaConfig' + mysql_config: + $ref: '#/definitions/v2.MySQLConfig' + only_output_updated_columns: + type: boolean +>>>>>>> 19277542cd (sink(ticdc): add index config to storage sink (#8918)) protocol: type: string schema_registry: diff --git a/pkg/cmd/util/helper_test.go b/pkg/cmd/util/helper_test.go index 50d9395ee28..eb02bc951a7 100644 --- a/pkg/cmd/util/helper_test.go +++ b/pkg/cmd/util/helper_test.go @@ -218,6 +218,7 @@ func TestAndWriteStorageSinkTOML(t *testing.T) { Terminator: "\r\n", DateSeparator: "day", EnablePartitionSeparator: true, + FileIndexWidth: config.DefaultFileIndexWidth, CSVConfig: &config.CSVConfig{ Delimiter: ",", Quote: "\"", diff --git a/pkg/config/sink.go b/pkg/config/sink.go index e9b6e7d5ea0..a9b7b7b4c7e 100644 --- a/pkg/config/sink.go +++ b/pkg/config/sink.go @@ -25,15 +25,14 @@ import ( "go.uber.org/zap" ) -// DefaultMaxMessageBytes sets the default value for max-message-bytes. -const DefaultMaxMessageBytes = 10 * 1024 * 1024 // 10M - const ( + // DefaultMaxMessageBytes sets the default value for max-message-bytes. + DefaultMaxMessageBytes = 10 * 1024 * 1024 // 10M + // TxnAtomicityKey specifies the key of the transaction-atomicity in the SinkURI. TxnAtomicityKey = "transaction-atomicity" // defaultTxnAtomicity is the default atomicity level. defaultTxnAtomicity = noneTxnAtomicity - // unknownTxnAtomicity is an invalid atomicity level and will be treated as // defaultTxnAtomicity when initializing sink in processor. unknownTxnAtomicity AtomicityLevel = "" @@ -41,9 +40,7 @@ const ( noneTxnAtomicity AtomicityLevel = "none" // tableTxnAtomicity means atomicity of single table transactions is guaranteed. tableTxnAtomicity AtomicityLevel = "table" -) -const ( // Comma is a constant for ',' Comma = "," // CR is an abbreviation for carriage return @@ -58,6 +55,13 @@ const ( Backslash = '\\' // NULL is a constant for '\N' NULL = "\\N" + + // MinFileIndexWidth is the minimum width of file index. + MinFileIndexWidth = 6 // enough for 2^19 files + // MaxFileIndexWidth is the maximum width of file index. + MaxFileIndexWidth = 20 // enough for 2^64 files + // DefaultFileIndexWidth is the default width of file index. + DefaultFileIndexWidth = MaxFileIndexWidth ) // AtomicityLevel represents the atomicity level of a changefeed. @@ -109,6 +113,16 @@ type SinkConfig struct { Terminator string `toml:"terminator" json:"terminator"` DateSeparator string `toml:"date-separator" json:"date-separator"` EnablePartitionSeparator bool `toml:"enable-partition-separator" json:"enable-partition-separator"` +<<<<<<< HEAD +======= + FileIndexWidth int `toml:"file-index-digit,omitempty" json:"file-index-digit,omitempty"` + + // EnableKafkaSinkV2 enabled then the kafka-go sink will be used. + EnableKafkaSinkV2 bool `toml:"enable-kafka-sink-v2" json:"enable-kafka-sink-v2"` + + OnlyOutputUpdatedColumns *bool `toml:"only-output-updated-columns" json:"only-output-updated-columns"` + +>>>>>>> 19277542cd (sink(ticdc): add index config to storage sink (#8918)) // TiDBSourceID is the source ID of the upstream TiDB, // which is used to set the `tidb_cdc_write_source` session variable. // Note: This field is only used internally and only used in the MySQL sink. @@ -127,6 +141,42 @@ type CSVConfig struct { IncludeCommitTs bool `toml:"include-commit-ts" json:"include-commit-ts"` } +func (c *CSVConfig) validateAndAdjust() error { + if c == nil { + return nil + } + + // validate quote + if len(c.Quote) > 1 { + return cerror.WrapError(cerror.ErrSinkInvalidConfig, + errors.New("csv config quote contains more than one character")) + } + if len(c.Quote) == 1 { + quote := c.Quote[0] + if quote == CR || quote == LF { + return cerror.WrapError(cerror.ErrSinkInvalidConfig, + errors.New("csv config quote cannot be line break character")) + } + } + + // validate delimiter + if len(c.Delimiter) == 0 { + return cerror.WrapError(cerror.ErrSinkInvalidConfig, + errors.New("csv config delimiter cannot be empty")) + } + if strings.ContainsRune(c.Delimiter, CR) || + strings.ContainsRune(c.Delimiter, LF) { + return cerror.WrapError(cerror.ErrSinkInvalidConfig, + errors.New("csv config delimiter contains line break characters")) + } + if len(c.Quote) > 0 && strings.Contains(c.Delimiter, c.Quote) { + return cerror.WrapError(cerror.ErrSinkInvalidConfig, + errors.New("csv config quote and delimiter cannot be the same")) + } + + return nil +} + // DateSeparator specifies the date separator in storage destination path type DateSeparator int @@ -229,48 +279,27 @@ func (s *SinkConfig) validateAndAdjust(sinkURI *url.URL, enableOldValue bool) er s.Terminator = CRLF } - // validate date separator - if len(s.DateSeparator) > 0 { - var separator DateSeparator - if err := separator.FromString(s.DateSeparator); err != nil { - return cerror.WrapError(cerror.ErrSinkInvalidConfig, err) + // validate storage sink related config + if sinkURI != nil && sink.IsStorageScheme(sinkURI.Scheme) { + // validate date separator + if len(s.DateSeparator) > 0 { + var separator DateSeparator + if err := separator.FromString(s.DateSeparator); err != nil { + return cerror.WrapError(cerror.ErrSinkInvalidConfig, err) + } } - } - - if s.CSVConfig != nil { - return s.validateAndAdjustCSVConfig() - } - - return nil -} -func (s *SinkConfig) validateAndAdjustCSVConfig() error { - // validate quote - if len(s.CSVConfig.Quote) > 1 { - return cerror.WrapError(cerror.ErrSinkInvalidConfig, - errors.New("csv config quote contains more than one character")) - } - if len(s.CSVConfig.Quote) == 1 { - quote := s.CSVConfig.Quote[0] - if quote == CR || quote == LF { - return cerror.WrapError(cerror.ErrSinkInvalidConfig, - errors.New("csv config quote cannot be line break character")) + // File index width should be in [minFileIndexWidth, maxFileIndexWidth]. + // In most scenarios, the user does not need to change this configuration, + // so the default value of this parameter is not set and just make silent + // adjustments here. + if s.FileIndexWidth < MinFileIndexWidth || s.FileIndexWidth > MaxFileIndexWidth { + s.FileIndexWidth = DefaultFileIndexWidth } - } - // validate delimiter - if len(s.CSVConfig.Delimiter) == 0 { - return cerror.WrapError(cerror.ErrSinkInvalidConfig, - errors.New("csv config delimiter cannot be empty")) - } - if strings.ContainsRune(s.CSVConfig.Delimiter, CR) || - strings.ContainsRune(s.CSVConfig.Delimiter, LF) { - return cerror.WrapError(cerror.ErrSinkInvalidConfig, - errors.New("csv config delimiter contains line break characters")) - } - if len(s.CSVConfig.Quote) > 0 && strings.Contains(s.CSVConfig.Delimiter, s.CSVConfig.Quote) { - return cerror.WrapError(cerror.ErrSinkInvalidConfig, - errors.New("csv config quote and delimiter cannot be the same")) + if err := s.CSVConfig.validateAndAdjust(); err != nil { + return err + } } return nil diff --git a/pkg/config/sink_test.go b/pkg/config/sink_test.go index d377fcdebbf..af7051e74bc 100644 --- a/pkg/config/sink_test.go +++ b/pkg/config/sink_test.go @@ -260,7 +260,6 @@ func TestApplyParameterBySinkURI(t *testing.T) { } else { require.ErrorContains(t, err, tc.expectedErr) } - } } @@ -333,6 +332,7 @@ func TestCheckCompatibilityWithSinkURI(t *testing.T) { } func TestValidateAndAdjustCSVConfig(t *testing.T) { + t.Parallel() tests := []struct { name string config *CSVConfig @@ -393,16 +393,38 @@ func TestValidateAndAdjustCSVConfig(t *testing.T) { wantErr: "csv config quote and delimiter cannot be the same", }, } - for _, tc := range tests { + for _, c := range tests { + tc := c t.Run(tc.name, func(t *testing.T) { + t.Parallel() s := &SinkConfig{ CSVConfig: tc.config, } if tc.wantErr == "" { - require.Nil(t, s.validateAndAdjustCSVConfig()) + require.Nil(t, s.CSVConfig.validateAndAdjust()) } else { - require.Regexp(t, tc.wantErr, s.validateAndAdjustCSVConfig()) + require.Regexp(t, tc.wantErr, s.CSVConfig.validateAndAdjust()) } }) } } + +func TestValidateAndAdjustStorageConfig(t *testing.T) { + t.Parallel() + + sinkURI, err := url.Parse("s3://bucket?protocol=csv") + require.NoError(t, err) + s := GetDefaultReplicaConfig() + err = s.ValidateAndAdjust(sinkURI) + require.NoError(t, err) + require.Equal(t, DefaultFileIndexWidth, s.Sink.FileIndexWidth) + + err = s.ValidateAndAdjust(sinkURI) + require.NoError(t, err) + require.Equal(t, DefaultFileIndexWidth, s.Sink.FileIndexWidth) + + s.Sink.FileIndexWidth = 16 + err = s.ValidateAndAdjust(sinkURI) + require.NoError(t, err) + require.Equal(t, 16, s.Sink.FileIndexWidth) +} diff --git a/pkg/sink/cloudstorage/config.go b/pkg/sink/cloudstorage/config.go index e566a8d7eb1..5c191336af0 100644 --- a/pkg/sink/cloudstorage/config.go +++ b/pkg/sink/cloudstorage/config.go @@ -52,6 +52,7 @@ type Config struct { WorkerCount int FlushInterval time.Duration FileSize int + FileIndexWidth int DateSeparator string EnablePartitionSeparator bool } @@ -96,6 +97,7 @@ func (c *Config) Apply( c.DateSeparator = replicaConfig.Sink.DateSeparator c.EnablePartitionSeparator = replicaConfig.Sink.EnablePartitionSeparator + c.FileIndexWidth = replicaConfig.Sink.FileIndexWidth return nil } diff --git a/pkg/sink/cloudstorage/config_test.go b/pkg/sink/cloudstorage/config_test.go index 36af97ad1b0..faa38d106f9 100644 --- a/pkg/sink/cloudstorage/config_test.go +++ b/pkg/sink/cloudstorage/config_test.go @@ -28,13 +28,18 @@ func TestConfigApply(t *testing.T) { expected.WorkerCount = 32 expected.FlushInterval = 10 * time.Second expected.FileSize = 16 * 1024 * 1024 + expected.FileIndexWidth = config.DefaultFileIndexWidth expected.DateSeparator = config.DateSeparatorNone.String() expected.EnablePartitionSeparator = true - uri := "s3://bucket/prefix?worker-count=32&flush-interval=10s&file-size=16777216" + uri := "s3://bucket/prefix?worker-count=32&flush-interval=10s&file-size=16777216&protocol=csv" sinkURI, err := url.Parse(uri) require.Nil(t, err) + + replicaConfig := config.GetDefaultReplicaConfig() + err = replicaConfig.ValidateAndAdjust(sinkURI) + require.NoError(t, err) cfg := NewConfig() - err = cfg.Apply(context.TODO(), sinkURI, config.GetDefaultReplicaConfig()) + err = cfg.Apply(context.TODO(), sinkURI, replicaConfig) require.Nil(t, err) require.Equal(t, expected, cfg) } diff --git a/pkg/sink/cloudstorage/path.go b/pkg/sink/cloudstorage/path.go index bd150c0847a..ddd9ace9c75 100644 --- a/pkg/sink/cloudstorage/path.go +++ b/pkg/sink/cloudstorage/path.go @@ -30,13 +30,71 @@ import ( const ( // 3 is the length of "CDC", and the file number contains // at least 6 digits (e.g. CDC000001.csv). - minFileNamePrefixLen = 9 + minFileNamePrefixLen = 3 + config.MinFileIndexWidth defaultIndexFileName = "CDC.index" ) +<<<<<<< HEAD // GenerateSchemaFilePath generates schema file path based on the table definition. func GenerateSchemaFilePath(def TableDefinition) string { return fmt.Sprintf("%s/%s/%d/schema.json", def.Schema, def.Table, def.TableVersion) +======= +var schemaRE = regexp.MustCompile(`meta/schema_\d+_\d{10}\.json$`) + +// IsSchemaFile checks whether the file is a schema file. +func IsSchemaFile(path string) bool { + return schemaRE.MatchString(path) +} + +// mustParseSchemaName parses the version from the schema file name. +func mustParseSchemaName(path string) (uint64, uint32) { + reportErr := func(err error) { + log.Panic("failed to parse schema file name", + zap.String("schemaPath", path), + zap.Any("error", err)) + } + + // For //meta/schema_{tableVersion}_{checksum}.json, the parts + // should be ["/
/meta/schema", "{tableVersion}", "{checksum}.json"]. + parts := strings.Split(path, "_") + if len(parts) < 3 { + reportErr(errors.New("invalid path format")) + } + + checksum := strings.TrimSuffix(parts[len(parts)-1], ".json") + tableChecksum, err := strconv.ParseUint(checksum, 10, 64) + if err != nil { + reportErr(err) + } + version := parts[len(parts)-2] + tableVersion, err := strconv.ParseUint(version, 10, 64) + if err != nil { + reportErr(err) + } + return tableVersion, uint32(tableChecksum) +} + +func generateSchemaFilePath( + schema, table string, tableVersion uint64, checksum uint32, +) string { + if schema == "" || tableVersion == 0 { + log.Panic("invalid schema or tableVersion", + zap.String("schema", schema), zap.Uint64("tableVersion", tableVersion)) + } + if table == "" { + // Generate db schema file path. + return fmt.Sprintf(dbSchemaPrefix+schemaFileNameFormat, + schema, tableVersion, checksum) + } + // Generate table schema file path. + return fmt.Sprintf(tableSchemaPrefix+schemaFileNameFormat, + schema, table, tableVersion, checksum) +>>>>>>> 19277542cd (sink(ticdc): add index config to storage sink (#8918)) +} + +func generateDataFileName(index uint64, extension string, fileIndexWidth int) string { + indexFmt := "%0" + strconv.Itoa(fileIndexWidth) + "d" + return fmt.Sprintf("CDC"+indexFmt+"%s", index, extension) } type indexWithDate struct { @@ -81,6 +139,93 @@ func NewFilePathGenerator( } } +<<<<<<< HEAD +======= +// CheckOrWriteSchema checks whether the schema file exists in the storage and +// write scheme.json if necessary. +func (f *FilePathGenerator) CheckOrWriteSchema( + ctx context.Context, + table VersionedTableName, + tableInfo *model.TableInfo, +) error { + if _, ok := f.versionMap[table]; ok { + return nil + } + + var def TableDefinition + def.FromTableInfo(tableInfo, table.TableInfoVersion) + if !def.IsTableSchema() { + // only check schema for table + log.Panic("invalid table schema", zap.Any("versionedTableName", table), + zap.Any("tableInfo", tableInfo)) + } + + // Case 1: point check if the schema file exists. + tblSchemaFile, err := def.GenerateSchemaFilePath() + if err != nil { + return err + } + exist, err := f.storage.FileExists(ctx, tblSchemaFile) + if err != nil { + return err + } + if exist { + f.versionMap[table] = table.TableInfoVersion + return nil + } + + // walk the table meta path to find the last schema file + _, checksum := mustParseSchemaName(tblSchemaFile) + schemaFileCnt := 0 + lastVersion := uint64(0) + prefix := fmt.Sprintf(tableSchemaPrefix+"schema_", def.Schema, def.Table) + checksumSuffix := fmt.Sprintf("%010d.json", checksum) + err = f.storage.WalkDir(ctx, &storage.WalkOption{ObjPrefix: prefix}, + func(path string, _ int64) error { + schemaFileCnt++ + if !strings.HasSuffix(path, checksumSuffix) { + return nil + } + version, parsedChecksum := mustParseSchemaName(path) + if parsedChecksum != checksum { + // TODO: parsedChecksum should be ignored, remove this panic + // after the new path protocol is verified. + log.Panic("invalid schema file name", + zap.String("path", path), zap.Any("checksum", checksum)) + } + if version > lastVersion { + lastVersion = version + } + return nil + }, + ) + if err != nil { + return err + } + + // Case 2: the table meta path is not empty. + if schemaFileCnt != 0 { + if lastVersion == 0 { + log.Panic("no table schema file found in an non-empty meta path", + zap.Any("versionedTableName", table), + zap.Uint32("checksum", checksum)) + } + f.versionMap[table] = lastVersion + return nil + } + + // Case 3: the table meta path is empty, which only happens when the table is + // existed before changefeed started. We need to write schema file to external + // storage. + encodedDetail, err := def.MarshalWithQuery() + if err != nil { + return err + } + f.versionMap[table] = table.TableInfoVersion + return f.storage.WriteFile(ctx, tblSchemaFile, encodedDetail) +} + +>>>>>>> 19277542cd (sink(ticdc): add index config to storage sink (#8918)) // SetClock is used for unit test func (f *FilePathGenerator) SetClock(clock clock.Clock) { f.clock = clock @@ -111,6 +256,25 @@ func (f *FilePathGenerator) GenerateDateStr() string { return dateStr } +// GenerateIndexFilePath generates a canonical path for index file. +func (f *FilePathGenerator) GenerateIndexFilePath(tbl VersionedTableName, date string) string { + dir := f.generateDataDirPath(tbl, date) + name := defaultIndexFileName + return strings.Join([]string{dir, name}, "/") +} + +// GenerateDataFilePath generates a canonical path for data file. +func (f *FilePathGenerator) GenerateDataFilePath( + ctx context.Context, tbl VersionedTableName, date string, +) (string, error) { + dir := f.generateDataDirPath(tbl, date) + name, err := f.generateDataFileName(ctx, tbl, date) + if err != nil { + return "", err + } + return strings.Join([]string{dir, name}, "/"), nil +} + func (f *FilePathGenerator) generateDataDirPath(tbl VersionedTableName, date string) string { var elems []string @@ -129,6 +293,7 @@ func (f *FilePathGenerator) generateDataDirPath(tbl VersionedTableName, date str return strings.Join(elems, "/") } +<<<<<<< HEAD func (f *FilePathGenerator) fetchIndexFromFileName(fileName string) (uint64, error) { var fileIdx uint64 var err error @@ -154,9 +319,11 @@ func (f *FilePathGenerator) GenerateDataFilePath( ctx context.Context, tbl VersionedTableName, date string, +======= +func (f *FilePathGenerator) generateDataFileName( + ctx context.Context, tbl VersionedTableName, date string, +>>>>>>> 19277542cd (sink(ticdc): add index config to storage sink (#8918)) ) (string, error) { - var elems []string - elems = append(elems, f.generateDataDirPath(tbl, date)) if idx, ok := f.fileIndex[tbl]; !ok { fileIdx, err := f.getNextFileIdxFromIndexFile(ctx, tbl, date) if err != nil { @@ -177,19 +344,7 @@ func (f *FilePathGenerator) GenerateDataFilePath( f.fileIndex[tbl].index = 0 } f.fileIndex[tbl].index++ - elems = append(elems, fmt.Sprintf("CDC%06d%s", f.fileIndex[tbl].index, f.extension)) - - return strings.Join(elems, "/"), nil -} - -// GenerateIndexFilePath generates a canonical path for index file. -func (f *FilePathGenerator) GenerateIndexFilePath(tbl VersionedTableName, date string) string { - var elems []string - - elems = append(elems, f.generateDataDirPath(tbl, date)) - elems = append(elems, defaultIndexFileName) - - return strings.Join(elems, "/") + return generateDataFileName(f.fileIndex[tbl].index, f.extension, f.config.FileIndexWidth), nil } func (f *FilePathGenerator) getNextFileIdxFromIndexFile( @@ -215,8 +370,8 @@ func (f *FilePathGenerator) getNextFileIdxFromIndexFile( } lastFilePath := strings.Join([]string{ - f.generateDataDirPath(tbl, date), // file dir - fmt.Sprintf("CDC%06d%s", maxFileIdx, f.extension), // file name + f.generateDataDirPath(tbl, date), // file dir + generateDataFileName(maxFileIdx, f.extension, f.config.FileIndexWidth), // file name }, "/") var lastFileExists, lastFileIsEmpty bool @@ -249,3 +404,23 @@ func (f *FilePathGenerator) getNextFileIdxFromIndexFile( } return fileIdx, nil } + +func (f *FilePathGenerator) fetchIndexFromFileName(fileName string) (uint64, error) { + var fileIdx uint64 + var err error + + if len(fileName) < minFileNamePrefixLen+len(f.extension) || + !strings.HasPrefix(fileName, "CDC") || + !strings.HasSuffix(fileName, f.extension) { + return 0, errors.WrapError(errors.ErrStorageSinkInvalidFileName, + fmt.Errorf("'%s' is a invalid file name", fileName)) + } + + extIdx := strings.Index(fileName, f.extension) + fileIdxStr := fileName[3:extIdx] + if fileIdx, err = strconv.ParseUint(fileIdxStr, 10, 64); err != nil { + return 0, errors.WrapError(errors.ErrStorageSinkInvalidFileName, err) + } + + return fileIdx, nil +} diff --git a/pkg/sink/cloudstorage/path_key.go b/pkg/sink/cloudstorage/path_key.go new file mode 100644 index 00000000000..e18636f1997 --- /dev/null +++ b/pkg/sink/cloudstorage/path_key.go @@ -0,0 +1,156 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package cloudstorage + +import ( + "fmt" + "regexp" + "strconv" + "strings" + + "github.com/pingcap/tiflow/pkg/config" + "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/quotes" +) + +// SchemaPathKey is the key of schema path. +type SchemaPathKey struct { + Schema string + Table string + TableVersion uint64 +} + +// GetKey returns the key of schema path. +func (s *SchemaPathKey) GetKey() string { + return quotes.QuoteSchema(s.Schema, s.Table) +} + +// ParseSchemaFilePath parses the schema file path and returns the table version and checksum. +func (s *SchemaPathKey) ParseSchemaFilePath(path string) (uint32, error) { + // For /
/meta/schema_{tableVersion}_{checksum}.json, the parts + // should be ["", "
", "meta", "schema_{tableVersion}_{checksum}.json"]. + matches := strings.Split(path, "/") + + var schema, table string + schema = matches[0] + switch len(matches) { + case 3: + table = "" + case 4: + table = matches[1] + default: + return 0, errors.Trace(fmt.Errorf("cannot match schema path pattern for %s", path)) + } + + if matches[len(matches)-2] != "meta" { + return 0, errors.Trace(fmt.Errorf("cannot match schema path pattern for %s", path)) + } + + schemaFileName := matches[len(matches)-1] + version, checksum := mustParseSchemaName(schemaFileName) + + *s = SchemaPathKey{ + Schema: schema, + Table: table, + TableVersion: version, + } + return checksum, nil +} + +// DmlPathKey is the key of dml path. +type DmlPathKey struct { + SchemaPathKey + PartitionNum int64 + Date string +} + +// GenerateDMLFilePath generates the dml file path. +func (d *DmlPathKey) GenerateDMLFilePath( + idx uint64, extension string, fileIndexWidth int, +) string { + var elems []string + + elems = append(elems, d.Schema) + elems = append(elems, d.Table) + elems = append(elems, fmt.Sprintf("%d", d.TableVersion)) + + if d.PartitionNum != 0 { + elems = append(elems, fmt.Sprintf("%d", d.PartitionNum)) + } + if len(d.Date) != 0 { + elems = append(elems, d.Date) + } + elems = append(elems, generateDataFileName(idx, extension, fileIndexWidth)) + + return strings.Join(elems, "/") +} + +// ParseDMLFilePath parses the dml file path and returns the max file index. +// DML file path pattern is as follows: +// {schema}/{table}/{table-version-separator}/{partition-separator}/{date-separator}/, where +// partition-separator and date-separator could be empty. +// DML file name pattern is as follows: CDC{num}.extension. +func (d *DmlPathKey) ParseDMLFilePath(dateSeparator, path string) (uint64, error) { + var partitionNum int64 + + str := `(\w+)\/(\w+)\/(\d+)\/(\d+)?\/*` + switch dateSeparator { + case config.DateSeparatorNone.String(): + str += `(\d{4})*` + case config.DateSeparatorYear.String(): + str += `(\d{4})\/` + case config.DateSeparatorMonth.String(): + str += `(\d{4}-\d{2})\/` + case config.DateSeparatorDay.String(): + str += `(\d{4}-\d{2}-\d{2})\/` + } + str += `CDC(\d+).\w+` + pathRE, err := regexp.Compile(str) + if err != nil { + return 0, err + } + + matches := pathRE.FindStringSubmatch(path) + if len(matches) != 7 { + return 0, fmt.Errorf("cannot match dml path pattern for %s", path) + } + + version, err := strconv.ParseUint(matches[3], 10, 64) + if err != nil { + return 0, err + } + + if len(matches[4]) > 0 { + partitionNum, err = strconv.ParseInt(matches[4], 10, 64) + if err != nil { + return 0, err + } + } + fileIdx, err := strconv.ParseUint(strings.TrimLeft(matches[6], "0"), 10, 64) + if err != nil { + return 0, err + } + + *d = DmlPathKey{ + SchemaPathKey: SchemaPathKey{ + Schema: matches[1], + Table: matches[2], + TableVersion: version, + }, + PartitionNum: partitionNum, + Date: matches[5], + } + + return fileIdx, nil +} diff --git a/pkg/sink/cloudstorage/path_key_test.go b/pkg/sink/cloudstorage/path_key_test.go new file mode 100644 index 00000000000..a78e91d1edd --- /dev/null +++ b/pkg/sink/cloudstorage/path_key_test.go @@ -0,0 +1,96 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package cloudstorage + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestSchemaPathKey(t *testing.T) { + t.Parallel() + + testCases := []struct { + path string + schemakey SchemaPathKey + checksum uint32 + }{ + // Test for database schema path: /meta/schema_{tableVersion}_{checksum}.json + { + path: "test_schema/meta/schema_1_2.json", + schemakey: SchemaPathKey{ + Schema: "test_schema", + Table: "", + TableVersion: 1, + }, + checksum: 2, + }, + // Test for table schema path: /
/meta/schema_{tableVersion}_{checksum}.json + { + path: "test_schema/test_table/meta/schema_11_22.json", + schemakey: SchemaPathKey{ + Schema: "test_schema", + Table: "test_table", + TableVersion: 11, + }, + checksum: 22, + }, + } + for _, tc := range testCases { + var schemaKey SchemaPathKey + checksum, err := schemaKey.ParseSchemaFilePath(tc.path) + require.NoError(t, err) + require.Equal(t, tc.schemakey, schemaKey) + require.Equal(t, tc.checksum, checksum) + } +} + +func TestDmlPathKey(t *testing.T) { + t.Parallel() + + testCases := []struct { + index int + fileIndexWidth int + extension string + path string + dmlkey DmlPathKey + }{ + { + index: 10, + fileIndexWidth: 20, + extension: ".csv", + path: "schema1/table1/123456/2023-05-09/CDC00000000000000000010.csv", + dmlkey: DmlPathKey{ + SchemaPathKey: SchemaPathKey{ + Schema: "schema1", + Table: "table1", + TableVersion: 123456, + }, + PartitionNum: 0, + Date: "2023-05-09", + }, + }, + } + + for _, tc := range testCases { + var dmlkey DmlPathKey + idx, err := dmlkey.ParseDMLFilePath("day", tc.path) + require.NoError(t, err) + require.Equal(t, tc.dmlkey, dmlkey) + + fileName := dmlkey.GenerateDMLFilePath(idx, tc.extension, tc.fileIndexWidth) + require.Equal(t, tc.path, fileName) + } +} diff --git a/pkg/sink/cloudstorage/path_test.go b/pkg/sink/cloudstorage/path_test.go index 9f57e153e27..d256fe50b14 100644 --- a/pkg/sink/cloudstorage/path_test.go +++ b/pkg/sink/cloudstorage/path_test.go @@ -36,6 +36,7 @@ func testFilePathGenerator(ctx context.Context, t *testing.T, dir string) *FileP require.NoError(t, err) replicaConfig := config.GetDefaultReplicaConfig() replicaConfig.Sink.Protocol = config.ProtocolOpen.String() + replicaConfig.Sink.FileIndexWidth = 6 cfg := NewConfig() err = cfg.Apply(ctx, sinkURI, replicaConfig) require.NoError(t, err)