Skip to content

Commit

Permalink
This is an automated cherry-pick of #8918
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
CharlesCheung96 authored and ti-chi-bot committed May 10, 2023
1 parent 98d0de6 commit aba607a
Show file tree
Hide file tree
Showing 16 changed files with 293 additions and 102 deletions.
22 changes: 22 additions & 0 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
Terminator: c.Sink.Terminator,
DateSeparator: c.Sink.DateSeparator,
EnablePartitionSeparator: c.Sink.EnablePartitionSeparator,
FileIndexWidth: c.Sink.FileIndexWidth,
EnableKafkaSinkV2: c.Sink.EnableKafkaSinkV2,
OnlyOutputUpdatedColumns: c.Sink.OnlyOutputUpdatedColumns,
}
Expand Down Expand Up @@ -400,6 +401,7 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
Terminator: cloned.Sink.Terminator,
DateSeparator: cloned.Sink.DateSeparator,
EnablePartitionSeparator: cloned.Sink.EnablePartitionSeparator,
FileIndexWidth: cloned.Sink.FileIndexWidth,
EnableKafkaSinkV2: cloned.Sink.EnableKafkaSinkV2,
OnlyOutputUpdatedColumns: cloned.Sink.OnlyOutputUpdatedColumns,
}
Expand Down Expand Up @@ -536,6 +538,7 @@ type Table struct {
// SinkConfig represents sink config for a changefeed
// This is a duplicate of config.SinkConfig
type SinkConfig struct {
<<<<<<< HEAD
Protocol string `json:"protocol"`
SchemaRegistry string `json:"schema_registry"`
CSVConfig *CSVConfig `json:"csv"`
Expand All @@ -548,6 +551,25 @@ type SinkConfig struct {
EnablePartitionSeparator bool `json:"enable_partition_separator"`
EnableKafkaSinkV2 bool `json:"enable_kafka_sink_v_2"`
OnlyOutputUpdatedColumns bool `json:"only_output_updated_columns"`
=======
Protocol string `json:"protocol"`
SchemaRegistry string `json:"schema_registry"`
CSVConfig *CSVConfig `json:"csv"`
DispatchRules []*DispatchRule `json:"dispatchers,omitempty"`
ColumnSelectors []*ColumnSelector `json:"column_selectors"`
TxnAtomicity string `json:"transaction_atomicity"`
EncoderConcurrency int `json:"encoder_concurrency"`
Terminator string `json:"terminator"`
DateSeparator string `json:"date_separator"`
EnablePartitionSeparator bool `json:"enable_partition_separator"`
FileIndexWidth int `json:"file_index_width"`
EnableKafkaSinkV2 bool `json:"enable_kafka_sink_v2"`
OnlyOutputUpdatedColumns *bool `json:"only_output_updated_columns"`
SafeMode *bool `json:"safe_mode,omitempty"`
KafkaConfig *KafkaConfig `json:"kafka_config,omitempty"`
MySQLConfig *MySQLConfig `json:"mysql_config,omitempty"`
CloudStorageConfig *CloudStorageConfig `json:"cloud_storage_config,omitempty"`
>>>>>>> 19277542cd (sink(ticdc): add index config to storage sink (#8918))
}

// CSVConfig denotes the csv config
Expand Down
3 changes: 2 additions & 1 deletion cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func TestCloudStorageWriteEventsWithoutDateSeparator(t *testing.T) {

replicaConfig := config.GetDefaultReplicaConfig()
replicaConfig.Sink.Protocol = config.ProtocolCsv.String()

replicaConfig.Sink.FileIndexWidth = 6
errCh := make(chan error, 5)
s, err := NewDMLSink(ctx, sinkURI, replicaConfig, errCh)
require.Nil(t, err)
Expand Down Expand Up @@ -172,6 +172,7 @@ func TestCloudStorageWriteEventsWithDateSeparator(t *testing.T) {
replicaConfig := config.GetDefaultReplicaConfig()
replicaConfig.Sink.Protocol = config.ProtocolCsv.String()
replicaConfig.Sink.DateSeparator = config.DateSeparatorDay.String()
replicaConfig.Sink.FileIndexWidth = 6

errCh := make(chan error, 5)
s, err := NewDMLSink(ctx, sinkURI, replicaConfig, errCh)
Expand Down
1 change: 1 addition & 0 deletions cdc/sink/dmlsink/cloudstorage/dml_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func testDMLWorker(ctx context.Context, t *testing.T, dir string) *dmlWorker {
require.Nil(t, err)
cfg := cloudstorage.NewConfig()
err = cfg.Apply(context.TODO(), sinkURI, config.GetDefaultReplicaConfig())
cfg.FileIndexWidth = 6
require.Nil(t, err)

statistics := metrics.NewStatistics(ctx, sink.TxnSink)
Expand Down
5 changes: 4 additions & 1 deletion cmd/storage-consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ var (
logFile string
logLevel string
flushInterval time.Duration
fileIndexWidth int
enableProfiling bool
timezone string
)
Expand All @@ -79,6 +80,8 @@ func init() {
flag.StringVar(&logFile, "log-file", "", "log file path")
flag.StringVar(&logLevel, "log-level", "info", "log level")
flag.DurationVar(&flushInterval, "flush-interval", 10*time.Second, "flush interval")
flag.IntVar(&fileIndexWidth, "file-index-width",
config.DefaultFileIndexWidth, "file index width")
flag.BoolVar(&enableProfiling, "enable-profiling", false, "whether to enable profiling")
flag.StringVar(&timezone, "tz", "System", "Specify time zone of storage consumer")
flag.Parse()
Expand Down Expand Up @@ -393,7 +396,7 @@ func (c *consumer) syncExecDMLEvents(
key cloudstorage.DmlPathKey,
fileIdx uint64,
) error {
filePath := key.GenerateDMLFilePath(fileIdx, c.fileExtension)
filePath := key.GenerateDMLFilePath(fileIdx, c.fileExtension, fileIndexWidth)
log.Debug("read from dml file path", zap.String("path", filePath))
content, err := c.externalStorage.ReadFile(ctx, filePath)
if err != nil {
Expand Down
24 changes: 24 additions & 0 deletions docs/swagger/docs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1391,6 +1391,18 @@ var doc = `{
"encoder-concurrency": {
"type": "integer"
},
<<<<<<< HEAD
=======
"file-index-digit": {
"type": "integer"
},
"kafka-config": {
"$ref": "#/definitions/config.KafkaConfig"
},
"mysql-config": {
"$ref": "#/definitions/config.MySQLConfig"
},
>>>>>>> 19277542cd (sink(ticdc): add index config to storage sink (#8918))
"only-output-updated-columns": {
"type": "boolean"
},
Expand Down Expand Up @@ -2220,6 +2232,18 @@ var doc = `{
"encoder_concurrency": {
"type": "integer"
},
<<<<<<< HEAD
=======
"file_index_width": {
"type": "integer"
},
"kafka_config": {
"$ref": "#/definitions/v2.KafkaConfig"
},
"mysql_config": {
"$ref": "#/definitions/v2.MySQLConfig"
},
>>>>>>> 19277542cd (sink(ticdc): add index config to storage sink (#8918))
"only_output_updated_columns": {
"type": "boolean"
},
Expand Down
24 changes: 24 additions & 0 deletions docs/swagger/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -1372,6 +1372,18 @@
"encoder-concurrency": {
"type": "integer"
},
<<<<<<< HEAD
=======
"file-index-digit": {
"type": "integer"
},
"kafka-config": {
"$ref": "#/definitions/config.KafkaConfig"
},
"mysql-config": {
"$ref": "#/definitions/config.MySQLConfig"
},
>>>>>>> 19277542cd (sink(ticdc): add index config to storage sink (#8918))
"only-output-updated-columns": {
"type": "boolean"
},
Expand Down Expand Up @@ -2201,6 +2213,18 @@
"encoder_concurrency": {
"type": "integer"
},
<<<<<<< HEAD
=======
"file_index_width": {
"type": "integer"
},
"kafka_config": {
"$ref": "#/definitions/v2.KafkaConfig"
},
"mysql_config": {
"$ref": "#/definitions/v2.MySQLConfig"
},
>>>>>>> 19277542cd (sink(ticdc): add index config to storage sink (#8918))
"only_output_updated_columns": {
"type": "boolean"
},
Expand Down
18 changes: 18 additions & 0 deletions docs/swagger/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,15 @@ definitions:
type: boolean
encoder-concurrency:
type: integer
<<<<<<< HEAD
=======
file-index-digit:
type: integer
kafka-config:
$ref: '#/definitions/config.KafkaConfig'
mysql-config:
$ref: '#/definitions/config.MySQLConfig'
>>>>>>> 19277542cd (sink(ticdc): add index config to storage sink (#8918))
only-output-updated-columns:
type: boolean
protocol:
Expand Down Expand Up @@ -619,6 +628,15 @@ definitions:
type: boolean
encoder_concurrency:
type: integer
<<<<<<< HEAD
=======
file_index_width:
type: integer
kafka_config:
$ref: '#/definitions/v2.KafkaConfig'
mysql_config:
$ref: '#/definitions/v2.MySQLConfig'
>>>>>>> 19277542cd (sink(ticdc): add index config to storage sink (#8918))
only_output_updated_columns:
type: boolean
protocol:
Expand Down
1 change: 1 addition & 0 deletions pkg/cmd/util/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ func TestAndWriteStorageSinkTOML(t *testing.T) {
Terminator: "\r\n",
DateSeparator: "day",
EnablePartitionSeparator: true,
FileIndexWidth: config.DefaultFileIndexWidth,
CSVConfig: &config.CSVConfig{
Delimiter: ",",
Quote: "\"",
Expand Down
108 changes: 64 additions & 44 deletions pkg/config/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,22 @@ import (
"go.uber.org/zap"
)

// DefaultMaxMessageBytes sets the default value for max-message-bytes.
const DefaultMaxMessageBytes = 10 * 1024 * 1024 // 10M

const (
// DefaultMaxMessageBytes sets the default value for max-message-bytes.
DefaultMaxMessageBytes = 10 * 1024 * 1024 // 10M

// TxnAtomicityKey specifies the key of the transaction-atomicity in the SinkURI.
TxnAtomicityKey = "transaction-atomicity"
// defaultTxnAtomicity is the default atomicity level.
defaultTxnAtomicity = noneTxnAtomicity

// unknownTxnAtomicity is an invalid atomicity level and will be treated as
// defaultTxnAtomicity when initializing sink in processor.
unknownTxnAtomicity AtomicityLevel = ""
// noneTxnAtomicity means atomicity of transactions is not guaranteed
noneTxnAtomicity AtomicityLevel = "none"
// tableTxnAtomicity means atomicity of single table transactions is guaranteed.
tableTxnAtomicity AtomicityLevel = "table"
)

const (
// Comma is a constant for ','
Comma = ","
// CR is an abbreviation for carriage return
Expand All @@ -58,6 +55,13 @@ const (
Backslash = '\\'
// NULL is a constant for '\N'
NULL = "\\N"

// MinFileIndexWidth is the minimum width of file index.
MinFileIndexWidth = 6 // enough for 2^19 files
// MaxFileIndexWidth is the maximum width of file index.
MaxFileIndexWidth = 20 // enough for 2^64 files
// DefaultFileIndexWidth is the default width of file index.
DefaultFileIndexWidth = MaxFileIndexWidth
)

// AtomicityLevel represents the atomicity level of a changefeed.
Expand Down Expand Up @@ -109,6 +113,7 @@ type SinkConfig struct {
Terminator string `toml:"terminator" json:"terminator"`
DateSeparator string `toml:"date-separator" json:"date-separator"`
EnablePartitionSeparator bool `toml:"enable-partition-separator" json:"enable-partition-separator"`
FileIndexWidth int `toml:"file-index-digit,omitempty" json:"file-index-digit,omitempty"`

// EnableKafkaSinkV2 enabled then the kafka-go sink will be used.
EnableKafkaSinkV2 bool `toml:"enable-kafka-sink-v2" json:"enable-kafka-sink-v2"`
Expand All @@ -133,6 +138,42 @@ type CSVConfig struct {
IncludeCommitTs bool `toml:"include-commit-ts" json:"include-commit-ts"`
}

func (c *CSVConfig) validateAndAdjust() error {
if c == nil {
return nil
}

// validate quote
if len(c.Quote) > 1 {
return cerror.WrapError(cerror.ErrSinkInvalidConfig,
errors.New("csv config quote contains more than one character"))
}
if len(c.Quote) == 1 {
quote := c.Quote[0]
if quote == CR || quote == LF {
return cerror.WrapError(cerror.ErrSinkInvalidConfig,
errors.New("csv config quote cannot be line break character"))
}
}

// validate delimiter
if len(c.Delimiter) == 0 {
return cerror.WrapError(cerror.ErrSinkInvalidConfig,
errors.New("csv config delimiter cannot be empty"))
}
if strings.ContainsRune(c.Delimiter, CR) ||
strings.ContainsRune(c.Delimiter, LF) {
return cerror.WrapError(cerror.ErrSinkInvalidConfig,
errors.New("csv config delimiter contains line break characters"))
}
if len(c.Quote) > 0 && strings.Contains(c.Delimiter, c.Quote) {
return cerror.WrapError(cerror.ErrSinkInvalidConfig,
errors.New("csv config quote and delimiter cannot be the same"))
}

return nil
}

// DateSeparator specifies the date separator in storage destination path
type DateSeparator int

Expand Down Expand Up @@ -235,48 +276,27 @@ func (s *SinkConfig) validateAndAdjust(sinkURI *url.URL, enableOldValue bool) er
s.Terminator = CRLF
}

// validate date separator
if len(s.DateSeparator) > 0 {
var separator DateSeparator
if err := separator.FromString(s.DateSeparator); err != nil {
return cerror.WrapError(cerror.ErrSinkInvalidConfig, err)
// validate storage sink related config
if sinkURI != nil && sink.IsStorageScheme(sinkURI.Scheme) {
// validate date separator
if len(s.DateSeparator) > 0 {
var separator DateSeparator
if err := separator.FromString(s.DateSeparator); err != nil {
return cerror.WrapError(cerror.ErrSinkInvalidConfig, err)
}
}
}

if s.CSVConfig != nil {
return s.validateAndAdjustCSVConfig()
}

return nil
}

func (s *SinkConfig) validateAndAdjustCSVConfig() error {
// validate quote
if len(s.CSVConfig.Quote) > 1 {
return cerror.WrapError(cerror.ErrSinkInvalidConfig,
errors.New("csv config quote contains more than one character"))
}
if len(s.CSVConfig.Quote) == 1 {
quote := s.CSVConfig.Quote[0]
if quote == CR || quote == LF {
return cerror.WrapError(cerror.ErrSinkInvalidConfig,
errors.New("csv config quote cannot be line break character"))
// File index width should be in [minFileIndexWidth, maxFileIndexWidth].
// In most scenarios, the user does not need to change this configuration,
// so the default value of this parameter is not set and just make silent
// adjustments here.
if s.FileIndexWidth < MinFileIndexWidth || s.FileIndexWidth > MaxFileIndexWidth {
s.FileIndexWidth = DefaultFileIndexWidth
}
}

// validate delimiter
if len(s.CSVConfig.Delimiter) == 0 {
return cerror.WrapError(cerror.ErrSinkInvalidConfig,
errors.New("csv config delimiter cannot be empty"))
}
if strings.ContainsRune(s.CSVConfig.Delimiter, CR) ||
strings.ContainsRune(s.CSVConfig.Delimiter, LF) {
return cerror.WrapError(cerror.ErrSinkInvalidConfig,
errors.New("csv config delimiter contains line break characters"))
}
if len(s.CSVConfig.Quote) > 0 && strings.Contains(s.CSVConfig.Delimiter, s.CSVConfig.Quote) {
return cerror.WrapError(cerror.ErrSinkInvalidConfig,
errors.New("csv config quote and delimiter cannot be the same"))
if err := s.CSVConfig.validateAndAdjust(); err != nil {
return err
}
}

return nil
Expand Down
Loading

0 comments on commit aba607a

Please sign in to comment.