Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sink(ticdc): add index config to storage sink (#8918) #8928

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,16 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
Terminator: c.Sink.Terminator,
DateSeparator: c.Sink.DateSeparator,
EnablePartitionSeparator: c.Sink.EnablePartitionSeparator,
<<<<<<< HEAD
=======
FileIndexWidth: c.Sink.FileIndexWidth,
EnableKafkaSinkV2: c.Sink.EnableKafkaSinkV2,
OnlyOutputUpdatedColumns: c.Sink.OnlyOutputUpdatedColumns,
KafkaConfig: kafkaConfig,
MySQLConfig: mysqlConfig,
CloudStorageConfig: cloudStorageConfig,
SafeMode: c.Sink.SafeMode,
>>>>>>> 19277542cd (sink(ticdc): add index config to storage sink (#8918))
}
}
if c.Mounter != nil {
Expand Down Expand Up @@ -383,6 +393,16 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
Terminator: cloned.Sink.Terminator,
DateSeparator: cloned.Sink.DateSeparator,
EnablePartitionSeparator: cloned.Sink.EnablePartitionSeparator,
<<<<<<< HEAD
=======
FileIndexWidth: cloned.Sink.FileIndexWidth,
EnableKafkaSinkV2: cloned.Sink.EnableKafkaSinkV2,
OnlyOutputUpdatedColumns: cloned.Sink.OnlyOutputUpdatedColumns,
KafkaConfig: kafkaConfig,
MySQLConfig: mysqlConfig,
CloudStorageConfig: cloudStorageConfig,
SafeMode: cloned.Sink.SafeMode,
>>>>>>> 19277542cd (sink(ticdc): add index config to storage sink (#8918))
}
}
if cloned.Consistent != nil {
Expand Down Expand Up @@ -502,6 +522,7 @@ type Table struct {
// SinkConfig represents sink config for a changefeed
// This is a duplicate of config.SinkConfig
type SinkConfig struct {
<<<<<<< HEAD
Protocol string `json:"protocol"`
SchemaRegistry string `json:"schema_registry"`
CSVConfig *CSVConfig `json:"csv"`
Expand All @@ -512,6 +533,25 @@ type SinkConfig struct {
Terminator string `json:"terminator"`
DateSeparator string `json:"date_separator"`
EnablePartitionSeparator bool `json:"enable_partition_separator"`
=======
Protocol string `json:"protocol"`
SchemaRegistry string `json:"schema_registry"`
CSVConfig *CSVConfig `json:"csv"`
DispatchRules []*DispatchRule `json:"dispatchers,omitempty"`
ColumnSelectors []*ColumnSelector `json:"column_selectors"`
TxnAtomicity string `json:"transaction_atomicity"`
EncoderConcurrency int `json:"encoder_concurrency"`
Terminator string `json:"terminator"`
DateSeparator string `json:"date_separator"`
EnablePartitionSeparator bool `json:"enable_partition_separator"`
FileIndexWidth int `json:"file_index_width"`
EnableKafkaSinkV2 bool `json:"enable_kafka_sink_v2"`
OnlyOutputUpdatedColumns *bool `json:"only_output_updated_columns"`
SafeMode *bool `json:"safe_mode,omitempty"`
KafkaConfig *KafkaConfig `json:"kafka_config,omitempty"`
MySQLConfig *MySQLConfig `json:"mysql_config,omitempty"`
CloudStorageConfig *CloudStorageConfig `json:"cloud_storage_config,omitempty"`
>>>>>>> 19277542cd (sink(ticdc): add index config to storage sink (#8918))
}

// CSVConfig denotes the csv config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,13 @@ func TestCloudStorageWriteEventsWithoutDateSeparator(t *testing.T) {
require.Nil(t, err)

replicaConfig := config.GetDefaultReplicaConfig()
<<<<<<< HEAD:cdc/sinkv2/eventsink/cloudstorage/cloud_storage_dml_sink_test.go
replicaConfig.Sink.Protocol = config.ProtocolOpen.String()

=======
replicaConfig.Sink.Protocol = config.ProtocolCsv.String()
replicaConfig.Sink.FileIndexWidth = 6
>>>>>>> 19277542cd (sink(ticdc): add index config to storage sink (#8918)):cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink_test.go
errCh := make(chan error, 5)
s, err := NewCloudStorageSink(ctx, sinkURI, replicaConfig, errCh)
require.Nil(t, err)
Expand Down Expand Up @@ -163,6 +168,7 @@ func TestCloudStorageWriteEventsWithDateSeparator(t *testing.T) {
replicaConfig := config.GetDefaultReplicaConfig()
replicaConfig.Sink.Protocol = config.ProtocolOpen.String()
replicaConfig.Sink.DateSeparator = config.DateSeparatorDay.String()
replicaConfig.Sink.FileIndexWidth = 6

errCh := make(chan error, 5)
s, err := NewCloudStorageSink(ctx, sinkURI, replicaConfig, errCh)
Expand Down
1 change: 1 addition & 0 deletions cdc/sinkv2/eventsink/cloudstorage/dml_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func testDMLWorker(ctx context.Context, t *testing.T, dir string) *dmlWorker {
require.Nil(t, err)
cfg := cloudstorage.NewConfig()
err = cfg.Apply(context.TODO(), sinkURI, config.GetDefaultReplicaConfig())
cfg.FileIndexWidth = 6
require.Nil(t, err)

statistics := metrics.NewStatistics(ctx, sink.TxnSink)
Expand Down
7 changes: 7 additions & 0 deletions cmd/storage-consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ var (
logFile string
logLevel string
flushInterval time.Duration
fileIndexWidth int
enableProfiling bool
timezone string
)
Expand All @@ -80,6 +81,8 @@ func init() {
flag.StringVar(&logFile, "log-file", "", "log file path")
flag.StringVar(&logLevel, "log-level", "info", "log level")
flag.DurationVar(&flushInterval, "flush-interval", 10*time.Second, "flush interval")
flag.IntVar(&fileIndexWidth, "file-index-width",
config.DefaultFileIndexWidth, "file index width")
flag.BoolVar(&enableProfiling, "enable-profiling", false, "whether to enable profiling")
flag.StringVar(&timezone, "tz", "System", "Specify time zone of storage consumer")
flag.Parse()
Expand Down Expand Up @@ -532,7 +535,11 @@ func (c *consumer) syncExecDMLEvents(
key dmlPathKey,
fileIdx uint64,
) error {
<<<<<<< HEAD
filePath := key.generateDMLFilePath(fileIdx, c.fileExtension)
=======
filePath := key.GenerateDMLFilePath(fileIdx, c.fileExtension, fileIndexWidth)
>>>>>>> 19277542cd (sink(ticdc): add index config to storage sink (#8918))
log.Debug("read from dml file path", zap.String("path", filePath))
content, err := c.externalStorage.ReadFile(ctx, filePath)
if err != nil {
Expand Down
30 changes: 30 additions & 0 deletions docs/swagger/docs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1271,6 +1271,21 @@ var doc = `{
"encoder-concurrency": {
"type": "integer"
},
<<<<<<< HEAD
=======
"file-index-digit": {
"type": "integer"
},
"kafka-config": {
"$ref": "#/definitions/config.KafkaConfig"
},
"mysql-config": {
"$ref": "#/definitions/config.MySQLConfig"
},
"only-output-updated-columns": {
"type": "boolean"
},
>>>>>>> 19277542cd (sink(ticdc): add index config to storage sink (#8918))
"protocol": {
"type": "string"
},
Expand Down Expand Up @@ -1976,6 +1991,21 @@ var doc = `{
"encoder_concurrency": {
"type": "integer"
},
<<<<<<< HEAD
=======
"file_index_width": {
"type": "integer"
},
"kafka_config": {
"$ref": "#/definitions/v2.KafkaConfig"
},
"mysql_config": {
"$ref": "#/definitions/v2.MySQLConfig"
},
"only_output_updated_columns": {
"type": "boolean"
},
>>>>>>> 19277542cd (sink(ticdc): add index config to storage sink (#8918))
"protocol": {
"type": "string"
},
Expand Down
30 changes: 30 additions & 0 deletions docs/swagger/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -1252,6 +1252,21 @@
"encoder-concurrency": {
"type": "integer"
},
<<<<<<< HEAD
=======
"file-index-digit": {
"type": "integer"
},
"kafka-config": {
"$ref": "#/definitions/config.KafkaConfig"
},
"mysql-config": {
"$ref": "#/definitions/config.MySQLConfig"
},
"only-output-updated-columns": {
"type": "boolean"
},
>>>>>>> 19277542cd (sink(ticdc): add index config to storage sink (#8918))
"protocol": {
"type": "string"
},
Expand Down Expand Up @@ -1957,6 +1972,21 @@
"encoder_concurrency": {
"type": "integer"
},
<<<<<<< HEAD
=======
"file_index_width": {
"type": "integer"
},
"kafka_config": {
"$ref": "#/definitions/v2.KafkaConfig"
},
"mysql_config": {
"$ref": "#/definitions/v2.MySQLConfig"
},
"only_output_updated_columns": {
"type": "boolean"
},
>>>>>>> 19277542cd (sink(ticdc): add index config to storage sink (#8918))
"protocol": {
"type": "string"
},
Expand Down
22 changes: 22 additions & 0 deletions docs/swagger/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,17 @@ definitions:
type: boolean
encoder-concurrency:
type: integer
<<<<<<< HEAD
=======
file-index-digit:
type: integer
kafka-config:
$ref: '#/definitions/config.KafkaConfig'
mysql-config:
$ref: '#/definitions/config.MySQLConfig'
only-output-updated-columns:
type: boolean
>>>>>>> 19277542cd (sink(ticdc): add index config to storage sink (#8918))
protocol:
type: string
schema-registry:
Expand Down Expand Up @@ -530,6 +541,17 @@ definitions:
type: boolean
encoder_concurrency:
type: integer
<<<<<<< HEAD
=======
file_index_width:
type: integer
kafka_config:
$ref: '#/definitions/v2.KafkaConfig'
mysql_config:
$ref: '#/definitions/v2.MySQLConfig'
only_output_updated_columns:
type: boolean
>>>>>>> 19277542cd (sink(ticdc): add index config to storage sink (#8918))
protocol:
type: string
schema_registry:
Expand Down
1 change: 1 addition & 0 deletions pkg/cmd/util/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ func TestAndWriteStorageSinkTOML(t *testing.T) {
Terminator: "\r\n",
DateSeparator: "day",
EnablePartitionSeparator: true,
FileIndexWidth: config.DefaultFileIndexWidth,
CSVConfig: &config.CSVConfig{
Delimiter: ",",
Quote: "\"",
Expand Down
Loading