Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#8918
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
CharlesCheung96 authored and ti-chi-bot committed May 10, 2023
1 parent bf96a69 commit 82397e1
Show file tree
Hide file tree
Showing 16 changed files with 691 additions and 68 deletions.
40 changes: 40 additions & 0 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,16 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
Terminator: c.Sink.Terminator,
DateSeparator: c.Sink.DateSeparator,
EnablePartitionSeparator: c.Sink.EnablePartitionSeparator,
<<<<<<< HEAD
=======
FileIndexWidth: c.Sink.FileIndexWidth,
EnableKafkaSinkV2: c.Sink.EnableKafkaSinkV2,
OnlyOutputUpdatedColumns: c.Sink.OnlyOutputUpdatedColumns,
KafkaConfig: kafkaConfig,
MySQLConfig: mysqlConfig,
CloudStorageConfig: cloudStorageConfig,
SafeMode: c.Sink.SafeMode,
>>>>>>> 19277542cd (sink(ticdc): add index config to storage sink (#8918))
}
}
if c.Mounter != nil {
Expand Down Expand Up @@ -383,6 +393,16 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
Terminator: cloned.Sink.Terminator,
DateSeparator: cloned.Sink.DateSeparator,
EnablePartitionSeparator: cloned.Sink.EnablePartitionSeparator,
<<<<<<< HEAD
=======
FileIndexWidth: cloned.Sink.FileIndexWidth,
EnableKafkaSinkV2: cloned.Sink.EnableKafkaSinkV2,
OnlyOutputUpdatedColumns: cloned.Sink.OnlyOutputUpdatedColumns,
KafkaConfig: kafkaConfig,
MySQLConfig: mysqlConfig,
CloudStorageConfig: cloudStorageConfig,
SafeMode: cloned.Sink.SafeMode,
>>>>>>> 19277542cd (sink(ticdc): add index config to storage sink (#8918))
}
}
if cloned.Consistent != nil {
Expand Down Expand Up @@ -502,6 +522,7 @@ type Table struct {
// SinkConfig represents sink config for a changefeed
// This is a duplicate of config.SinkConfig
type SinkConfig struct {
<<<<<<< HEAD
Protocol string `json:"protocol"`
SchemaRegistry string `json:"schema_registry"`
CSVConfig *CSVConfig `json:"csv"`
Expand All @@ -512,6 +533,25 @@ type SinkConfig struct {
Terminator string `json:"terminator"`
DateSeparator string `json:"date_separator"`
EnablePartitionSeparator bool `json:"enable_partition_separator"`
=======
Protocol string `json:"protocol"`
SchemaRegistry string `json:"schema_registry"`
CSVConfig *CSVConfig `json:"csv"`
DispatchRules []*DispatchRule `json:"dispatchers,omitempty"`
ColumnSelectors []*ColumnSelector `json:"column_selectors"`
TxnAtomicity string `json:"transaction_atomicity"`
EncoderConcurrency int `json:"encoder_concurrency"`
Terminator string `json:"terminator"`
DateSeparator string `json:"date_separator"`
EnablePartitionSeparator bool `json:"enable_partition_separator"`
FileIndexWidth int `json:"file_index_width"`
EnableKafkaSinkV2 bool `json:"enable_kafka_sink_v2"`
OnlyOutputUpdatedColumns *bool `json:"only_output_updated_columns"`
SafeMode *bool `json:"safe_mode,omitempty"`
KafkaConfig *KafkaConfig `json:"kafka_config,omitempty"`
MySQLConfig *MySQLConfig `json:"mysql_config,omitempty"`
CloudStorageConfig *CloudStorageConfig `json:"cloud_storage_config,omitempty"`
>>>>>>> 19277542cd (sink(ticdc): add index config to storage sink (#8918))
}

// CSVConfig denotes the csv config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,13 @@ func TestCloudStorageWriteEventsWithoutDateSeparator(t *testing.T) {
require.Nil(t, err)

replicaConfig := config.GetDefaultReplicaConfig()
<<<<<<< HEAD:cdc/sinkv2/eventsink/cloudstorage/cloud_storage_dml_sink_test.go
replicaConfig.Sink.Protocol = config.ProtocolOpen.String()

=======
replicaConfig.Sink.Protocol = config.ProtocolCsv.String()
replicaConfig.Sink.FileIndexWidth = 6
>>>>>>> 19277542cd (sink(ticdc): add index config to storage sink (#8918)):cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink_test.go
errCh := make(chan error, 5)
s, err := NewCloudStorageSink(ctx, sinkURI, replicaConfig, errCh)
require.Nil(t, err)
Expand Down Expand Up @@ -163,6 +168,7 @@ func TestCloudStorageWriteEventsWithDateSeparator(t *testing.T) {
replicaConfig := config.GetDefaultReplicaConfig()
replicaConfig.Sink.Protocol = config.ProtocolOpen.String()
replicaConfig.Sink.DateSeparator = config.DateSeparatorDay.String()
replicaConfig.Sink.FileIndexWidth = 6

errCh := make(chan error, 5)
s, err := NewCloudStorageSink(ctx, sinkURI, replicaConfig, errCh)
Expand Down
1 change: 1 addition & 0 deletions cdc/sinkv2/eventsink/cloudstorage/dml_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func testDMLWorker(ctx context.Context, t *testing.T, dir string) *dmlWorker {
require.Nil(t, err)
cfg := cloudstorage.NewConfig()
err = cfg.Apply(context.TODO(), sinkURI, config.GetDefaultReplicaConfig())
cfg.FileIndexWidth = 6
require.Nil(t, err)

statistics := metrics.NewStatistics(ctx, sink.TxnSink)
Expand Down
7 changes: 7 additions & 0 deletions cmd/storage-consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ var (
logFile string
logLevel string
flushInterval time.Duration
fileIndexWidth int
enableProfiling bool
timezone string
)
Expand All @@ -80,6 +81,8 @@ func init() {
flag.StringVar(&logFile, "log-file", "", "log file path")
flag.StringVar(&logLevel, "log-level", "info", "log level")
flag.DurationVar(&flushInterval, "flush-interval", 10*time.Second, "flush interval")
flag.IntVar(&fileIndexWidth, "file-index-width",
config.DefaultFileIndexWidth, "file index width")
flag.BoolVar(&enableProfiling, "enable-profiling", false, "whether to enable profiling")
flag.StringVar(&timezone, "tz", "System", "Specify time zone of storage consumer")
flag.Parse()
Expand Down Expand Up @@ -532,7 +535,11 @@ func (c *consumer) syncExecDMLEvents(
key dmlPathKey,
fileIdx uint64,
) error {
<<<<<<< HEAD
filePath := key.generateDMLFilePath(fileIdx, c.fileExtension)
=======
filePath := key.GenerateDMLFilePath(fileIdx, c.fileExtension, fileIndexWidth)
>>>>>>> 19277542cd (sink(ticdc): add index config to storage sink (#8918))
log.Debug("read from dml file path", zap.String("path", filePath))
content, err := c.externalStorage.ReadFile(ctx, filePath)
if err != nil {
Expand Down
30 changes: 30 additions & 0 deletions docs/swagger/docs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1271,6 +1271,21 @@ var doc = `{
"encoder-concurrency": {
"type": "integer"
},
<<<<<<< HEAD
=======
"file-index-digit": {
"type": "integer"
},
"kafka-config": {
"$ref": "#/definitions/config.KafkaConfig"
},
"mysql-config": {
"$ref": "#/definitions/config.MySQLConfig"
},
"only-output-updated-columns": {
"type": "boolean"
},
>>>>>>> 19277542cd (sink(ticdc): add index config to storage sink (#8918))
"protocol": {
"type": "string"
},
Expand Down Expand Up @@ -1976,6 +1991,21 @@ var doc = `{
"encoder_concurrency": {
"type": "integer"
},
<<<<<<< HEAD
=======
"file_index_width": {
"type": "integer"
},
"kafka_config": {
"$ref": "#/definitions/v2.KafkaConfig"
},
"mysql_config": {
"$ref": "#/definitions/v2.MySQLConfig"
},
"only_output_updated_columns": {
"type": "boolean"
},
>>>>>>> 19277542cd (sink(ticdc): add index config to storage sink (#8918))
"protocol": {
"type": "string"
},
Expand Down
30 changes: 30 additions & 0 deletions docs/swagger/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -1252,6 +1252,21 @@
"encoder-concurrency": {
"type": "integer"
},
<<<<<<< HEAD
=======
"file-index-digit": {
"type": "integer"
},
"kafka-config": {
"$ref": "#/definitions/config.KafkaConfig"
},
"mysql-config": {
"$ref": "#/definitions/config.MySQLConfig"
},
"only-output-updated-columns": {
"type": "boolean"
},
>>>>>>> 19277542cd (sink(ticdc): add index config to storage sink (#8918))
"protocol": {
"type": "string"
},
Expand Down Expand Up @@ -1957,6 +1972,21 @@
"encoder_concurrency": {
"type": "integer"
},
<<<<<<< HEAD
=======
"file_index_width": {
"type": "integer"
},
"kafka_config": {
"$ref": "#/definitions/v2.KafkaConfig"
},
"mysql_config": {
"$ref": "#/definitions/v2.MySQLConfig"
},
"only_output_updated_columns": {
"type": "boolean"
},
>>>>>>> 19277542cd (sink(ticdc): add index config to storage sink (#8918))
"protocol": {
"type": "string"
},
Expand Down
22 changes: 22 additions & 0 deletions docs/swagger/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,17 @@ definitions:
type: boolean
encoder-concurrency:
type: integer
<<<<<<< HEAD
=======
file-index-digit:
type: integer
kafka-config:
$ref: '#/definitions/config.KafkaConfig'
mysql-config:
$ref: '#/definitions/config.MySQLConfig'
only-output-updated-columns:
type: boolean
>>>>>>> 19277542cd (sink(ticdc): add index config to storage sink (#8918))
protocol:
type: string
schema-registry:
Expand Down Expand Up @@ -530,6 +541,17 @@ definitions:
type: boolean
encoder_concurrency:
type: integer
<<<<<<< HEAD
=======
file_index_width:
type: integer
kafka_config:
$ref: '#/definitions/v2.KafkaConfig'
mysql_config:
$ref: '#/definitions/v2.MySQLConfig'
only_output_updated_columns:
type: boolean
>>>>>>> 19277542cd (sink(ticdc): add index config to storage sink (#8918))
protocol:
type: string
schema_registry:
Expand Down
1 change: 1 addition & 0 deletions pkg/cmd/util/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ func TestAndWriteStorageSinkTOML(t *testing.T) {
Terminator: "\r\n",
DateSeparator: "day",
EnablePartitionSeparator: true,
FileIndexWidth: config.DefaultFileIndexWidth,
CSVConfig: &config.CSVConfig{
Delimiter: ",",
Quote: "\"",
Expand Down
Loading

0 comments on commit 82397e1

Please sign in to comment.