Skip to content

Commit

Permalink
fix lint (#1)
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesCheung96 committed May 9, 2023
1 parent 3d05ec9 commit d9b7731
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 15 deletions.
5 changes: 4 additions & 1 deletion cmd/storage-consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ var (
logFile string
logLevel string
flushInterval time.Duration
fileIndexWidth int
enableProfiling bool
timezone string
)
Expand All @@ -79,6 +80,8 @@ func init() {
flag.StringVar(&logFile, "log-file", "", "log file path")
flag.StringVar(&logLevel, "log-level", "info", "log level")
flag.DurationVar(&flushInterval, "flush-interval", 10*time.Second, "flush interval")
flag.IntVar(&fileIndexWidth, "file-index-width",
config.DefaultFileIndexWidth, "file index width")
flag.BoolVar(&enableProfiling, "enable-profiling", false, "whether to enable profiling")
flag.StringVar(&timezone, "tz", "System", "Specify time zone of storage consumer")
flag.Parse()
Expand Down Expand Up @@ -397,7 +400,7 @@ func (c *consumer) syncExecDMLEvents(
key cloudstorage.DmlPathKey,
fileIdx uint64,
) error {
filePath := key.GenerateDMLFilePath(fileIdx, c.fileExtension)
filePath := key.GenerateDMLFilePath(fileIdx, c.fileExtension, fileIndexWidth)
log.Debug("read from dml file path", zap.String("path", filePath))
content, err := c.externalStorage.ReadFile(ctx, filePath)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/config/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ const (

// MinFileIndexWidth is the minimum width of file index.
MinFileIndexWidth = 6 // enough for 2^19 files
// MinFileIndexWidth is the maximum width of file index.
// MaxFileIndexWidth is the maximum width of file index.
MaxFileIndexWidth = 20 // enough for 2^64 files
// DefaultFileIndexWidth is the default width of file index.
DefaultFileIndexWidth = MaxFileIndexWidth
Expand Down
4 changes: 3 additions & 1 deletion pkg/config/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,8 +393,10 @@ func TestValidateAndAdjustCSVConfig(t *testing.T) {
wantErr: "csv config quote and delimiter cannot be the same",
},
}
for _, tc := range tests {
for _, c := range tests {
tc := c
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
s := &SinkConfig{
CSVConfig: tc.config,
}
Expand Down
18 changes: 9 additions & 9 deletions pkg/sink/cloudstorage/path.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ func IsSchemaFile(path string) bool {
return schemaRE.MatchString(path)
}

// MustParseSchemaName parses the version from the schema file name.
func MustParseSchemaName(path string) (uint64, uint32) {
// mustParseSchemaName parses the version from the schema file name.
func mustParseSchemaName(path string) (uint64, uint32) {
reportErr := func(err error) {
log.Panic("failed to parse schema file name",
zap.String("schemaPath", path),
Expand Down Expand Up @@ -99,8 +99,8 @@ func generateSchemaFilePath(
schema, table, tableVersion, checksum)
}

func generateDataFileName(index uint64, extension string, config *Config) string {
indexFmt := "%0" + strconv.Itoa(config.FileIndexWidth) + "d"
func generateDataFileName(index uint64, extension string, fileIndexWidth int) string {
indexFmt := "%0" + strconv.Itoa(fileIndexWidth) + "d"
return fmt.Sprintf("CDC"+indexFmt+"%s", index, extension)
}

Expand Down Expand Up @@ -185,7 +185,7 @@ func (f *FilePathGenerator) CheckOrWriteSchema(
}

// walk the table meta path to find the last schema file
_, checksum := MustParseSchemaName(tblSchemaFile)
_, checksum := mustParseSchemaName(tblSchemaFile)
schemaFileCnt := 0
lastVersion := uint64(0)
prefix := fmt.Sprintf(tableSchemaPrefix+"schema_", def.Schema, def.Table)
Expand All @@ -196,7 +196,7 @@ func (f *FilePathGenerator) CheckOrWriteSchema(
if !strings.HasSuffix(path, checksumSuffix) {
return nil
}
version, parsedChecksum := MustParseSchemaName(path)
version, parsedChecksum := mustParseSchemaName(path)
if parsedChecksum != checksum {
// TODO: parsedChecksum should be ignored, remove this panic
// after the new path protocol is verified.
Expand Down Expand Up @@ -319,7 +319,7 @@ func (f *FilePathGenerator) generateDataFileName(
f.fileIndex[tbl].index = 0
}
f.fileIndex[tbl].index++
return generateDataFileName(f.fileIndex[tbl].index, f.extension, f.config), nil
return generateDataFileName(f.fileIndex[tbl].index, f.extension, f.config.FileIndexWidth), nil
}

func (f *FilePathGenerator) getNextFileIdxFromIndexFile(
Expand All @@ -345,8 +345,8 @@ func (f *FilePathGenerator) getNextFileIdxFromIndexFile(
}

lastFilePath := strings.Join([]string{
f.generateDataDirPath(tbl, date), // file dir
generateDataFileName(maxFileIdx, f.extension, f.config), // file name
f.generateDataDirPath(tbl, date), // file dir
generateDataFileName(maxFileIdx, f.extension, f.config.FileIndexWidth), // file name
}, "/")

var lastFileExists, lastFileIsEmpty bool
Expand Down
8 changes: 5 additions & 3 deletions pkg/sink/cloudstorage/path_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (s *SchemaPathKey) ParseSchemaFilePath(path string) (uint32, error) {
}

schemaFileName := matches[len(matches)-1]
version, checksum := MustParseSchemaName(schemaFileName)
version, checksum := mustParseSchemaName(schemaFileName)

*s = SchemaPathKey{
Schema: schema,
Expand All @@ -76,7 +76,9 @@ type DmlPathKey struct {
}

// GenerateDMLFilePath generates the dml file path.
func (d *DmlPathKey) GenerateDMLFilePath(idx uint64, extension string) string {
func (d *DmlPathKey) GenerateDMLFilePath(
idx uint64, extension string, fileIndexWidth int,
) string {
var elems []string

elems = append(elems, d.Schema)
Expand All @@ -89,7 +91,7 @@ func (d *DmlPathKey) GenerateDMLFilePath(idx uint64, extension string) string {
if len(d.Date) != 0 {
elems = append(elems, d.Date)
}
elems = append(elems, fmt.Sprintf("CDC%06d%s", idx, extension))
elems = append(elems, generateDataFileName(idx, extension, fileIndexWidth))

return strings.Join(elems, "/")
}
Expand Down
38 changes: 38 additions & 0 deletions pkg/sink/cloudstorage/path_key_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,41 @@ func TestSchemaPathKey(t *testing.T) {
require.Equal(t, tc.checksum, checksum)
}
}

func TestDmlPathKey(t *testing.T) {
t.Parallel()

testCases := []struct {
index int
fileIndexWidth int
extension string
path string
dmlkey DmlPathKey
}{
{
index: 10,
fileIndexWidth: 20,
extension: ".csv",
path: "schema1/table1/123456/2023-05-09/CDC00000000000000000010.csv",
dmlkey: DmlPathKey{
SchemaPathKey: SchemaPathKey{
Schema: "schema1",
Table: "table1",
TableVersion: 123456,
},
PartitionNum: 0,
Date: "2023-05-09",
},
},
}

for _, tc := range testCases {
var dmlkey DmlPathKey
idx, err := dmlkey.ParseDMLFilePath("day", tc.path)
require.NoError(t, err)
require.Equal(t, tc.dmlkey, dmlkey)

fileName := dmlkey.GenerateDMLFilePath(idx, tc.extension, tc.fileIndexWidth)
require.Equal(t, tc.path, fileName)
}
}

0 comments on commit d9b7731

Please sign in to comment.