Skip to content

Commit

Permalink
sink(ticdc): add index config to storage sink (#8918)
Browse files Browse the repository at this point in the history
close #8919
  • Loading branch information
CharlesCheung96 committed May 10, 2023
1 parent 265cc9c commit 98c663a
Show file tree
Hide file tree
Showing 16 changed files with 224 additions and 102 deletions.
3 changes: 3 additions & 0 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
Terminator: c.Sink.Terminator,
DateSeparator: c.Sink.DateSeparator,
EnablePartitionSeparator: c.Sink.EnablePartitionSeparator,
FileIndexWidth: c.Sink.FileIndexWidth,
EnableKafkaSinkV2: c.Sink.EnableKafkaSinkV2,
OnlyOutputUpdatedColumns: c.Sink.OnlyOutputUpdatedColumns,
}
Expand Down Expand Up @@ -400,6 +401,7 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
Terminator: cloned.Sink.Terminator,
DateSeparator: cloned.Sink.DateSeparator,
EnablePartitionSeparator: cloned.Sink.EnablePartitionSeparator,
FileIndexWidth: cloned.Sink.FileIndexWidth,
EnableKafkaSinkV2: cloned.Sink.EnableKafkaSinkV2,
OnlyOutputUpdatedColumns: cloned.Sink.OnlyOutputUpdatedColumns,
}
Expand Down Expand Up @@ -546,6 +548,7 @@ type SinkConfig struct {
Terminator string `json:"terminator"`
DateSeparator string `json:"date_separator"`
EnablePartitionSeparator bool `json:"enable_partition_separator"`
FileIndexWidth int `json:"file_index_width"`
EnableKafkaSinkV2 bool `json:"enable_kafka_sink_v_2"`
OnlyOutputUpdatedColumns bool `json:"only_output_updated_columns"`
}
Expand Down
3 changes: 2 additions & 1 deletion cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func TestCloudStorageWriteEventsWithoutDateSeparator(t *testing.T) {

replicaConfig := config.GetDefaultReplicaConfig()
replicaConfig.Sink.Protocol = config.ProtocolCsv.String()

replicaConfig.Sink.FileIndexWidth = 6
errCh := make(chan error, 5)
s, err := NewDMLSink(ctx, sinkURI, replicaConfig, errCh)
require.Nil(t, err)
Expand Down Expand Up @@ -172,6 +172,7 @@ func TestCloudStorageWriteEventsWithDateSeparator(t *testing.T) {
replicaConfig := config.GetDefaultReplicaConfig()
replicaConfig.Sink.Protocol = config.ProtocolCsv.String()
replicaConfig.Sink.DateSeparator = config.DateSeparatorDay.String()
replicaConfig.Sink.FileIndexWidth = 6

errCh := make(chan error, 5)
s, err := NewDMLSink(ctx, sinkURI, replicaConfig, errCh)
Expand Down
1 change: 1 addition & 0 deletions cdc/sink/dmlsink/cloudstorage/dml_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func testDMLWorker(ctx context.Context, t *testing.T, dir string) *dmlWorker {
require.Nil(t, err)
cfg := cloudstorage.NewConfig()
err = cfg.Apply(context.TODO(), sinkURI, config.GetDefaultReplicaConfig())
cfg.FileIndexWidth = 6
require.Nil(t, err)

statistics := metrics.NewStatistics(ctx, sink.TxnSink)
Expand Down
5 changes: 4 additions & 1 deletion cmd/storage-consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ var (
logFile string
logLevel string
flushInterval time.Duration
fileIndexWidth int
enableProfiling bool
timezone string
)
Expand All @@ -79,6 +80,8 @@ func init() {
flag.StringVar(&logFile, "log-file", "", "log file path")
flag.StringVar(&logLevel, "log-level", "info", "log level")
flag.DurationVar(&flushInterval, "flush-interval", 10*time.Second, "flush interval")
flag.IntVar(&fileIndexWidth, "file-index-width",
config.DefaultFileIndexWidth, "file index width")
flag.BoolVar(&enableProfiling, "enable-profiling", false, "whether to enable profiling")
flag.StringVar(&timezone, "tz", "System", "Specify time zone of storage consumer")
flag.Parse()
Expand Down Expand Up @@ -393,7 +396,7 @@ func (c *consumer) syncExecDMLEvents(
key cloudstorage.DmlPathKey,
fileIdx uint64,
) error {
filePath := key.GenerateDMLFilePath(fileIdx, c.fileExtension)
filePath := key.GenerateDMLFilePath(fileIdx, c.fileExtension, fileIndexWidth)
log.Debug("read from dml file path", zap.String("path", filePath))
content, err := c.externalStorage.ReadFile(ctx, filePath)
if err != nil {
Expand Down
6 changes: 6 additions & 0 deletions docs/swagger/docs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1391,6 +1391,9 @@ var doc = `{
"encoder-concurrency": {
"type": "integer"
},
"file-index-digit": {
"type": "integer"
},
"only-output-updated-columns": {
"type": "boolean"
},
Expand Down Expand Up @@ -2220,6 +2223,9 @@ var doc = `{
"encoder_concurrency": {
"type": "integer"
},
"file_index_width": {
"type": "integer"
},
"only_output_updated_columns": {
"type": "boolean"
},
Expand Down
6 changes: 6 additions & 0 deletions docs/swagger/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -1372,6 +1372,9 @@
"encoder-concurrency": {
"type": "integer"
},
"file-index-digit": {
"type": "integer"
},
"only-output-updated-columns": {
"type": "boolean"
},
Expand Down Expand Up @@ -2201,6 +2204,9 @@
"encoder_concurrency": {
"type": "integer"
},
"file_index_width": {
"type": "integer"
},
"only_output_updated_columns": {
"type": "boolean"
},
Expand Down
4 changes: 4 additions & 0 deletions docs/swagger/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ definitions:
type: boolean
encoder-concurrency:
type: integer
file-index-digit:
type: integer
only-output-updated-columns:
type: boolean
protocol:
Expand Down Expand Up @@ -619,6 +621,8 @@ definitions:
type: boolean
encoder_concurrency:
type: integer
file_index_width:
type: integer
only_output_updated_columns:
type: boolean
protocol:
Expand Down
1 change: 1 addition & 0 deletions pkg/cmd/util/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ func TestAndWriteStorageSinkTOML(t *testing.T) {
Terminator: "\r\n",
DateSeparator: "day",
EnablePartitionSeparator: true,
FileIndexWidth: config.DefaultFileIndexWidth,
CSVConfig: &config.CSVConfig{
Delimiter: ",",
Quote: "\"",
Expand Down
108 changes: 64 additions & 44 deletions pkg/config/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,22 @@ import (
"go.uber.org/zap"
)

// DefaultMaxMessageBytes sets the default value for max-message-bytes.
const DefaultMaxMessageBytes = 10 * 1024 * 1024 // 10M

const (
// DefaultMaxMessageBytes sets the default value for max-message-bytes.
DefaultMaxMessageBytes = 10 * 1024 * 1024 // 10M

// TxnAtomicityKey specifies the key of the transaction-atomicity in the SinkURI.
TxnAtomicityKey = "transaction-atomicity"
// defaultTxnAtomicity is the default atomicity level.
defaultTxnAtomicity = noneTxnAtomicity

// unknownTxnAtomicity is an invalid atomicity level and will be treated as
// defaultTxnAtomicity when initializing sink in processor.
unknownTxnAtomicity AtomicityLevel = ""
// noneTxnAtomicity means atomicity of transactions is not guaranteed
noneTxnAtomicity AtomicityLevel = "none"
// tableTxnAtomicity means atomicity of single table transactions is guaranteed.
tableTxnAtomicity AtomicityLevel = "table"
)

const (
// Comma is a constant for ','
Comma = ","
// CR is an abbreviation for carriage return
Expand All @@ -58,6 +55,13 @@ const (
Backslash = '\\'
// NULL is a constant for '\N'
NULL = "\\N"

// MinFileIndexWidth is the minimum width of file index.
MinFileIndexWidth = 6 // enough for 2^19 files
// MaxFileIndexWidth is the maximum width of file index.
MaxFileIndexWidth = 20 // enough for 2^64 files
// DefaultFileIndexWidth is the default width of file index.
DefaultFileIndexWidth = MaxFileIndexWidth
)

// AtomicityLevel represents the atomicity level of a changefeed.
Expand Down Expand Up @@ -109,6 +113,7 @@ type SinkConfig struct {
Terminator string `toml:"terminator" json:"terminator"`
DateSeparator string `toml:"date-separator" json:"date-separator"`
EnablePartitionSeparator bool `toml:"enable-partition-separator" json:"enable-partition-separator"`
FileIndexWidth int `toml:"file-index-digit,omitempty" json:"file-index-digit,omitempty"`

// EnableKafkaSinkV2 enabled then the kafka-go sink will be used.
EnableKafkaSinkV2 bool `toml:"enable-kafka-sink-v2" json:"enable-kafka-sink-v2"`
Expand All @@ -133,6 +138,42 @@ type CSVConfig struct {
IncludeCommitTs bool `toml:"include-commit-ts" json:"include-commit-ts"`
}

func (c *CSVConfig) validateAndAdjust() error {
if c == nil {
return nil
}

// validate quote
if len(c.Quote) > 1 {
return cerror.WrapError(cerror.ErrSinkInvalidConfig,
errors.New("csv config quote contains more than one character"))
}
if len(c.Quote) == 1 {
quote := c.Quote[0]
if quote == CR || quote == LF {
return cerror.WrapError(cerror.ErrSinkInvalidConfig,
errors.New("csv config quote cannot be line break character"))
}
}

// validate delimiter
if len(c.Delimiter) == 0 {
return cerror.WrapError(cerror.ErrSinkInvalidConfig,
errors.New("csv config delimiter cannot be empty"))
}
if strings.ContainsRune(c.Delimiter, CR) ||
strings.ContainsRune(c.Delimiter, LF) {
return cerror.WrapError(cerror.ErrSinkInvalidConfig,
errors.New("csv config delimiter contains line break characters"))
}
if len(c.Quote) > 0 && strings.Contains(c.Delimiter, c.Quote) {
return cerror.WrapError(cerror.ErrSinkInvalidConfig,
errors.New("csv config quote and delimiter cannot be the same"))
}

return nil
}

// DateSeparator specifies the date separator in storage destination path
type DateSeparator int

Expand Down Expand Up @@ -235,48 +276,27 @@ func (s *SinkConfig) validateAndAdjust(sinkURI *url.URL, enableOldValue bool) er
s.Terminator = CRLF
}

// validate date separator
if len(s.DateSeparator) > 0 {
var separator DateSeparator
if err := separator.FromString(s.DateSeparator); err != nil {
return cerror.WrapError(cerror.ErrSinkInvalidConfig, err)
// validate storage sink related config
if sinkURI != nil && sink.IsStorageScheme(sinkURI.Scheme) {
// validate date separator
if len(s.DateSeparator) > 0 {
var separator DateSeparator
if err := separator.FromString(s.DateSeparator); err != nil {
return cerror.WrapError(cerror.ErrSinkInvalidConfig, err)
}
}
}

if s.CSVConfig != nil {
return s.validateAndAdjustCSVConfig()
}

return nil
}

func (s *SinkConfig) validateAndAdjustCSVConfig() error {
// validate quote
if len(s.CSVConfig.Quote) > 1 {
return cerror.WrapError(cerror.ErrSinkInvalidConfig,
errors.New("csv config quote contains more than one character"))
}
if len(s.CSVConfig.Quote) == 1 {
quote := s.CSVConfig.Quote[0]
if quote == CR || quote == LF {
return cerror.WrapError(cerror.ErrSinkInvalidConfig,
errors.New("csv config quote cannot be line break character"))
// File index width should be in [minFileIndexWidth, maxFileIndexWidth].
// In most scenarios, the user does not need to change this configuration,
// so the default value of this parameter is not set and just make silent
// adjustments here.
if s.FileIndexWidth < MinFileIndexWidth || s.FileIndexWidth > MaxFileIndexWidth {
s.FileIndexWidth = DefaultFileIndexWidth
}
}

// validate delimiter
if len(s.CSVConfig.Delimiter) == 0 {
return cerror.WrapError(cerror.ErrSinkInvalidConfig,
errors.New("csv config delimiter cannot be empty"))
}
if strings.ContainsRune(s.CSVConfig.Delimiter, CR) ||
strings.ContainsRune(s.CSVConfig.Delimiter, LF) {
return cerror.WrapError(cerror.ErrSinkInvalidConfig,
errors.New("csv config delimiter contains line break characters"))
}
if len(s.CSVConfig.Quote) > 0 && strings.Contains(s.CSVConfig.Delimiter, s.CSVConfig.Quote) {
return cerror.WrapError(cerror.ErrSinkInvalidConfig,
errors.New("csv config quote and delimiter cannot be the same"))
if err := s.CSVConfig.validateAndAdjust(); err != nil {
return err
}
}

return nil
Expand Down
30 changes: 26 additions & 4 deletions pkg/config/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,6 @@ func TestApplyParameterBySinkURI(t *testing.T) {
} else {
require.ErrorContains(t, err, tc.expectedErr)
}

}
}

Expand Down Expand Up @@ -333,6 +332,7 @@ func TestCheckCompatibilityWithSinkURI(t *testing.T) {
}

func TestValidateAndAdjustCSVConfig(t *testing.T) {
t.Parallel()
tests := []struct {
name string
config *CSVConfig
Expand Down Expand Up @@ -393,16 +393,38 @@ func TestValidateAndAdjustCSVConfig(t *testing.T) {
wantErr: "csv config quote and delimiter cannot be the same",
},
}
for _, tc := range tests {
for _, c := range tests {
tc := c
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
s := &SinkConfig{
CSVConfig: tc.config,
}
if tc.wantErr == "" {
require.Nil(t, s.validateAndAdjustCSVConfig())
require.Nil(t, s.CSVConfig.validateAndAdjust())
} else {
require.Regexp(t, tc.wantErr, s.validateAndAdjustCSVConfig())
require.Regexp(t, tc.wantErr, s.CSVConfig.validateAndAdjust())
}
})
}
}

func TestValidateAndAdjustStorageConfig(t *testing.T) {
t.Parallel()

sinkURI, err := url.Parse("s3://bucket?protocol=csv")
require.NoError(t, err)
s := GetDefaultReplicaConfig()
err = s.ValidateAndAdjust(sinkURI)
require.NoError(t, err)
require.Equal(t, DefaultFileIndexWidth, s.Sink.FileIndexWidth)

err = s.ValidateAndAdjust(sinkURI)
require.NoError(t, err)
require.Equal(t, DefaultFileIndexWidth, s.Sink.FileIndexWidth)

s.Sink.FileIndexWidth = 16
err = s.ValidateAndAdjust(sinkURI)
require.NoError(t, err)
require.Equal(t, 16, s.Sink.FileIndexWidth)
}
2 changes: 2 additions & 0 deletions pkg/sink/cloudstorage/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type Config struct {
WorkerCount int
FlushInterval time.Duration
FileSize int
FileIndexWidth int
DateSeparator string
EnablePartitionSeparator bool
}
Expand Down Expand Up @@ -96,6 +97,7 @@ func (c *Config) Apply(

c.DateSeparator = replicaConfig.Sink.DateSeparator
c.EnablePartitionSeparator = replicaConfig.Sink.EnablePartitionSeparator
c.FileIndexWidth = replicaConfig.Sink.FileIndexWidth

return nil
}
Expand Down
9 changes: 7 additions & 2 deletions pkg/sink/cloudstorage/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,18 @@ func TestConfigApply(t *testing.T) {
expected.WorkerCount = 32
expected.FlushInterval = 10 * time.Second
expected.FileSize = 16 * 1024 * 1024
expected.FileIndexWidth = config.DefaultFileIndexWidth
expected.DateSeparator = config.DateSeparatorNone.String()
expected.EnablePartitionSeparator = true
uri := "s3://bucket/prefix?worker-count=32&flush-interval=10s&file-size=16777216"
uri := "s3://bucket/prefix?worker-count=32&flush-interval=10s&file-size=16777216&protocol=csv"
sinkURI, err := url.Parse(uri)
require.Nil(t, err)

replicaConfig := config.GetDefaultReplicaConfig()
err = replicaConfig.ValidateAndAdjust(sinkURI)
require.NoError(t, err)
cfg := NewConfig()
err = cfg.Apply(context.TODO(), sinkURI, config.GetDefaultReplicaConfig())
err = cfg.Apply(context.TODO(), sinkURI, replicaConfig)
require.Nil(t, err)
require.Equal(t, expected, cfg)
}
Expand Down
Loading

0 comments on commit 98c663a

Please sign in to comment.