From 572a4e9df83eec5e29f216cd3dc56d0f36929143 Mon Sep 17 00:00:00 2001 From: CharlesCheung <61726649+CharlesCheung96@users.noreply.github.com> Date: Tue, 9 May 2023 12:36:57 +0800 Subject: [PATCH] sink(ticdc): change the directory of storage sink only when ddl event occurs (#8881) close pingcap/tiflow#8890, close pingcap/tiflow#8891 --- Makefile | 3 + cdc/owner/schema.go | 6 +- .../cloudstorage/cloud_storage_ddl_sink.go | 22 +- .../cloud_storage_ddl_sink_test.go | 7 +- .../cloud_storage_dml_sink_test.go | 144 ++++---- .../eventsink/cloudstorage/dml_worker.go | 39 +-- .../eventsink/cloudstorage/dml_worker_test.go | 4 +- cmd/storage-consumer/main.go | 317 +++++++----------- pkg/sink/cloudstorage/main_test.go | 24 ++ pkg/sink/cloudstorage/path.go | 179 +++++++++- pkg/sink/cloudstorage/path_key.go | 154 +++++++++ pkg/sink/cloudstorage/path_key_test.go | 58 ++++ pkg/sink/cloudstorage/path_test.go | 45 +++ pkg/sink/cloudstorage/table_definition.go | 94 +++++- .../cloudstorage/table_definition_test.go | 99 ++++-- .../canal_json_storage_basic/run.sh | 1 - .../data/prepare.sql | 9 +- .../canal_json_storage_partition_table/run.sh | 2 - .../csv_storage_basic/run.sh | 1 - .../csv_storage_multi_tables_ddl/run.sh | 1 - .../data/prepare.sql | 9 +- .../csv_storage_partition_table/run.sh | 2 - 22 files changed, 828 insertions(+), 392 deletions(-) create mode 100644 pkg/sink/cloudstorage/main_test.go create mode 100644 pkg/sink/cloudstorage/path_key.go create mode 100644 pkg/sink/cloudstorage/path_key_test.go diff --git a/Makefile b/Makefile index 8c2a34000ac..295bf02c0e6 100644 --- a/Makefile +++ b/Makefile @@ -218,6 +218,9 @@ build_mysql_integration_test_images: clean_integration_test_containers integration_test_kafka: check_third_party_binary tests/integration_tests/run.sh kafka "$(CASE)" "$(START_AT)" +integration_test_storage: + tests/integration_tests/run.sh storage "$(CASE)" "$(START_AT)" + kafka_docker_integration_test: ## Run TiCDC Kafka all integration tests in Docker. kafka_docker_integration_test: clean_integration_test_containers docker-compose -f $(TICDC_DOCKER_DEPLOYMENTS_DIR)/docker-compose-kafka-integration.yml up diff --git a/cdc/owner/schema.go b/cdc/owner/schema.go index 2a8b2069658..19f561369ca 100644 --- a/cdc/owner/schema.go +++ b/cdc/owner/schema.go @@ -221,10 +221,12 @@ func (s *schemaWrap4Owner) BuildDDLEvents( if job.BinlogInfo != nil && job.BinlogInfo.TableInfo != nil { tableInfo = model.WrapTableInfo(job.SchemaID, job.SchemaName, job.BinlogInfo.FinishedTS, job.BinlogInfo.TableInfo) } else { - // for an invalid DDL job or a DDL job that does not contain TableInfo, - // just retrieve the schema name. + // Just retrieve the schema name for a DDL job that does not contain TableInfo. + // Currently supported by cdc are: ActionCreateSchema, ActionDropSchema, + // and ActionModifySchemaCharsetAndCollate. tableInfo = &model.TableInfo{ TableName: model.TableName{Schema: job.SchemaName}, + Version: job.BinlogInfo.FinishedTS, } } event.FromJob(job, preTableInfo, tableInfo) diff --git a/cdc/sinkv2/ddlsink/cloudstorage/cloud_storage_ddl_sink.go b/cdc/sinkv2/ddlsink/cloudstorage/cloud_storage_ddl_sink.go index 57a070f82a1..27390849901 100644 --- a/cdc/sinkv2/ddlsink/cloudstorage/cloud_storage_ddl_sink.go +++ b/cdc/sinkv2/ddlsink/cloudstorage/cloud_storage_ddl_sink.go @@ -16,10 +16,10 @@ package cloudstorage import ( "context" "encoding/json" - "fmt" "net/url" "github.com/pingcap/errors" + "github.com/pingcap/log" "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tiflow/cdc/contextutil" "github.com/pingcap/tiflow/cdc/model" @@ -28,6 +28,7 @@ import ( "github.com/pingcap/tiflow/pkg/sink" "github.com/pingcap/tiflow/pkg/sink/cloudstorage" "github.com/pingcap/tiflow/pkg/util" + "go.uber.org/zap" ) // Assert DDLEventSink implementation @@ -58,24 +59,21 @@ func NewCloudStorageDDLSink(ctx context.Context, sinkURI *url.URL) (*ddlSink, er return d, nil } -func generateSchemaPath(def cloudstorage.TableDefinition) string { - return fmt.Sprintf("%s/%s/%d/schema.json", def.Schema, def.Table, def.TableVersion) -} - +// WriteDDLEvent writes the ddl event to the cloud storage. func (d *ddlSink) WriteDDLEvent(ctx context.Context, ddl *model.DDLEvent) error { var def cloudstorage.TableDefinition - - if ddl.TableInfo.TableInfo == nil { - return nil - } - def.FromDDLEvent(ddl) - encodedDef, err := json.MarshalIndent(def, "", " ") + encodedDef, err := def.MarshalWithQuery() if err != nil { return errors.Trace(err) } - path := generateSchemaPath(def) + path, err := def.GenerateSchemaFilePath() + if err != nil { + return errors.Trace(err) + } + log.Debug("write ddl event to external storage", + zap.String("path", path), zap.Any("ddl", ddl)) err = d.statistics.RecordDDLExecution(func() error { err1 := d.storage.WriteFile(ctx, path, encodedDef) if err1 != nil { diff --git a/cdc/sinkv2/ddlsink/cloudstorage/cloud_storage_ddl_sink_test.go b/cdc/sinkv2/ddlsink/cloudstorage/cloud_storage_ddl_sink_test.go index 112ed54a600..e8703c2c674 100644 --- a/cdc/sinkv2/ddlsink/cloudstorage/cloud_storage_ddl_sink_test.go +++ b/cdc/sinkv2/ddlsink/cloudstorage/cloud_storage_ddl_sink_test.go @@ -63,12 +63,11 @@ func TestWriteDDLEvent(t *testing.T) { }, }, } - tableDir := path.Join(parentDir, "test/table1/100") - os.MkdirAll(tableDir, 0o755) + tableDir := path.Join(parentDir, "test/table1/meta/") err = sink.WriteDDLEvent(ctx, ddlEvent) require.Nil(t, err) - tableSchema, err := os.ReadFile(path.Join(tableDir, "schema.json")) + tableSchema, err := os.ReadFile(path.Join(tableDir, "schema_100_4192708364.json")) require.Nil(t, err) require.JSONEq(t, `{ "Table": "table1", @@ -124,8 +123,6 @@ func TestWriteCheckpointTs(t *testing.T) { }, }, } - table1Dir := path.Join(parentDir, "test/table1/100") - os.MkdirAll(table1Dir, 0o755) err = sink.WriteCheckpointTs(ctx, 100, tables) require.Nil(t, err) diff --git a/cdc/sinkv2/eventsink/cloudstorage/cloud_storage_dml_sink_test.go b/cdc/sinkv2/eventsink/cloudstorage/cloud_storage_dml_sink_test.go index e2a8646871a..fb779a22d98 100644 --- a/cdc/sinkv2/eventsink/cloudstorage/cloud_storage_dml_sink_test.go +++ b/cdc/sinkv2/eventsink/cloudstorage/cloud_storage_dml_sink_test.go @@ -25,8 +25,9 @@ import ( timodel "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/parser/types" + "github.com/pingcap/tidb/util/rowcodec" "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/cdc/sinkv2/eventsink" + dmlsink "github.com/pingcap/tiflow/cdc/sinkv2/eventsink" "github.com/pingcap/tiflow/cdc/sinkv2/tablesink/state" "github.com/pingcap/tiflow/engine/pkg/clock" "github.com/pingcap/tiflow/pkg/config" @@ -39,16 +40,34 @@ func setClock(s *dmlSink, clock clock.Clock) { } } +func getTableFiles(t *testing.T, tableDir string) []string { + files, err := os.ReadDir(tableDir) + require.Nil(t, err) + + fileNames := []string{} + for _, f := range files { + fileName := f.Name() + if f.IsDir() { + metaFiles, err := os.ReadDir(path.Join(tableDir, f.Name())) + require.Nil(t, err) + require.Len(t, metaFiles, 1) + fileName = metaFiles[0].Name() + } + fileNames = append(fileNames, fileName) + } + return fileNames +} + func generateTxnEvents( cnt *uint64, batch int, tableStatus *state.TableSinkState, -) []*eventsink.TxnCallbackableEvent { +) []*dmlsink.TxnCallbackableEvent { // assume we have a large transaction and it is splitted into 10 small transactions - txns := make([]*eventsink.TxnCallbackableEvent, 0, 10) + txns := make([]*dmlsink.TxnCallbackableEvent, 0, 10) for i := 0; i < 10; i++ { - txn := &eventsink.TxnCallbackableEvent{ + txn := &dmlsink.TxnCallbackableEvent{ Event: &model.SingleTableTxn{ CommitTs: 100, Table: &model.TableName{Schema: "test", Table: "table1"}, @@ -80,6 +99,10 @@ func generateTxnEvents( {Name: "c1", Value: i*batch + j}, {Name: "c2", Value: "hello world"}, }, + ColInfos: []rowcodec.ColInfo{ + {ID: 1, Ft: types.NewFieldType(mysql.TypeLong)}, + {ID: 2, Ft: types.NewFieldType(mysql.TypeVarchar)}, + }, } txn.Event.Rows = append(txn.Event.Rows, row) } @@ -99,8 +122,7 @@ func TestCloudStorageWriteEventsWithoutDateSeparator(t *testing.T) { require.Nil(t, err) replicaConfig := config.GetDefaultReplicaConfig() - replicaConfig.Sink.Protocol = config.ProtocolOpen.String() - + replicaConfig.Sink.Protocol = config.ProtocolCsv.String() errCh := make(chan error, 5) s, err := NewCloudStorageSink(ctx, sinkURI, replicaConfig, errCh) require.Nil(t, err) @@ -110,26 +132,26 @@ func TestCloudStorageWriteEventsWithoutDateSeparator(t *testing.T) { // generating one dml file. txns := generateTxnEvents(&cnt, batch, &tableStatus) - tableDir := path.Join(parentDir, "test/table1/33") err = s.WriteEvents(txns...) require.Nil(t, err) time.Sleep(3 * time.Second) - files, err := os.ReadDir(tableDir) + metaDir := path.Join(parentDir, "test/table1/meta") + files, err := os.ReadDir(metaDir) require.Nil(t, err) - require.Len(t, files, 3) - var fileNames []string - for _, f := range files { - fileNames = append(fileNames, f.Name()) - } - require.ElementsMatch(t, []string{"CDC000001.json", "schema.json", "CDC.index"}, fileNames) - content, err := os.ReadFile(path.Join(tableDir, "CDC000001.json")) + require.Len(t, files, 1) + + tableDir := path.Join(parentDir, "test/table1/33") + fileNames := getTableFiles(t, tableDir) + require.Len(t, fileNames, 2) + require.ElementsMatch(t, []string{"CDC000001.csv", "CDC.index"}, fileNames) + content, err := os.ReadFile(path.Join(tableDir, "CDC000001.csv")) require.Nil(t, err) require.Greater(t, len(content), 0) - content, err = os.ReadFile(path.Join(tableDir, "CDC.index")) + content, err = os.ReadFile(path.Join(tableDir, "meta/CDC.index")) require.Nil(t, err) - require.Equal(t, "CDC000001.json\n", string(content)) + require.Equal(t, "CDC000001.csv\n", string(content)) require.Equal(t, uint64(1000), atomic.LoadUint64(&cnt)) // generating another dml file. @@ -137,24 +159,18 @@ func TestCloudStorageWriteEventsWithoutDateSeparator(t *testing.T) { require.Nil(t, err) time.Sleep(3 * time.Second) - files, err = os.ReadDir(tableDir) - require.Nil(t, err) - require.Len(t, files, 4) - fileNames = nil - for _, f := range files { - fileNames = append(fileNames, f.Name()) - } + fileNames = getTableFiles(t, tableDir) + require.Len(t, fileNames, 3) require.ElementsMatch(t, []string{ - "CDC000001.json", "CDC000002.json", - "schema.json", "CDC.index", + "CDC000001.csv", "CDC000002.csv", "CDC.index", }, fileNames) - content, err = os.ReadFile(path.Join(tableDir, "CDC000002.json")) + content, err = os.ReadFile(path.Join(tableDir, "CDC000002.csv")) require.Nil(t, err) require.Greater(t, len(content), 0) - content, err = os.ReadFile(path.Join(tableDir, "CDC.index")) + content, err = os.ReadFile(path.Join(tableDir, "meta/CDC.index")) require.Nil(t, err) - require.Equal(t, "CDC000002.json\n", string(content)) + require.Equal(t, "CDC000002.csv\n", string(content)) require.Equal(t, uint64(2000), atomic.LoadUint64(&cnt)) cancel() @@ -171,7 +187,7 @@ func TestCloudStorageWriteEventsWithDateSeparator(t *testing.T) { require.Nil(t, err) replicaConfig := config.GetDefaultReplicaConfig() - replicaConfig.Sink.Protocol = config.ProtocolOpen.String() + replicaConfig.Sink.Protocol = config.ProtocolCsv.String() replicaConfig.Sink.DateSeparator = config.DateSeparatorDay.String() errCh := make(chan error, 5) @@ -191,21 +207,16 @@ func TestCloudStorageWriteEventsWithDateSeparator(t *testing.T) { require.Nil(t, err) time.Sleep(3 * time.Second) - files, err := os.ReadDir(tableDir) - require.Nil(t, err) - require.Len(t, files, 2) - var fileNames []string - for _, f := range files { - fileNames = append(fileNames, f.Name()) - } - require.ElementsMatch(t, []string{"CDC000001.json", "CDC.index"}, fileNames) - content, err := os.ReadFile(path.Join(tableDir, "CDC000001.json")) + fileNames := getTableFiles(t, tableDir) + require.Len(t, fileNames, 2) + require.ElementsMatch(t, []string{"CDC000001.csv", "CDC.index"}, fileNames) + content, err := os.ReadFile(path.Join(tableDir, "CDC000001.csv")) require.Nil(t, err) require.Greater(t, len(content), 0) - content, err = os.ReadFile(path.Join(tableDir, "CDC.index")) + content, err = os.ReadFile(path.Join(tableDir, "meta/CDC.index")) require.Nil(t, err) - require.Equal(t, "CDC000001.json\n", string(content)) + require.Equal(t, "CDC000001.csv\n", string(content)) require.Equal(t, uint64(1000), atomic.LoadUint64(&cnt)) // test date (day) is NOT changed. @@ -216,21 +227,16 @@ func TestCloudStorageWriteEventsWithDateSeparator(t *testing.T) { require.Nil(t, err) time.Sleep(3 * time.Second) - files, err = os.ReadDir(tableDir) - require.Nil(t, err) - require.Len(t, files, 3) - fileNames = nil - for _, f := range files { - fileNames = append(fileNames, f.Name()) - } - require.ElementsMatch(t, []string{"CDC000001.json", "CDC000002.json", "CDC.index"}, fileNames) - content, err = os.ReadFile(path.Join(tableDir, "CDC000002.json")) + fileNames = getTableFiles(t, tableDir) + require.Len(t, fileNames, 3) + require.ElementsMatch(t, []string{"CDC000001.csv", "CDC000002.csv", "CDC.index"}, fileNames) + content, err = os.ReadFile(path.Join(tableDir, "CDC000002.csv")) require.Nil(t, err) require.Greater(t, len(content), 0) - content, err = os.ReadFile(path.Join(tableDir, "CDC.index")) + content, err = os.ReadFile(path.Join(tableDir, "meta/CDC.index")) require.Nil(t, err) - require.Equal(t, "CDC000002.json\n", string(content)) + require.Equal(t, "CDC000002.csv\n", string(content)) require.Equal(t, uint64(2000), atomic.LoadUint64(&cnt)) // test date (day) is changed. @@ -242,21 +248,16 @@ func TestCloudStorageWriteEventsWithDateSeparator(t *testing.T) { time.Sleep(3 * time.Second) tableDir = path.Join(parentDir, "test/table1/33/2023-03-09") - files, err = os.ReadDir(tableDir) - require.Nil(t, err) - require.Len(t, files, 2) - fileNames = nil - for _, f := range files { - fileNames = append(fileNames, f.Name()) - } - require.ElementsMatch(t, []string{"CDC000001.json", "CDC.index"}, fileNames) - content, err = os.ReadFile(path.Join(tableDir, "CDC000001.json")) + fileNames = getTableFiles(t, tableDir) + require.Len(t, fileNames, 2) + require.ElementsMatch(t, []string{"CDC000001.csv", "CDC.index"}, fileNames) + content, err = os.ReadFile(path.Join(tableDir, "CDC000001.csv")) require.Nil(t, err) require.Greater(t, len(content), 0) - content, err = os.ReadFile(path.Join(tableDir, "CDC.index")) + content, err = os.ReadFile(path.Join(tableDir, "meta/CDC.index")) require.Nil(t, err) - require.Equal(t, "CDC000001.json\n", string(content)) + require.Equal(t, "CDC000001.csv\n", string(content)) require.Equal(t, uint64(3000), atomic.LoadUint64(&cnt)) cancel() s.Close() @@ -274,21 +275,16 @@ func TestCloudStorageWriteEventsWithDateSeparator(t *testing.T) { require.Nil(t, err) time.Sleep(3 * time.Second) - files, err = os.ReadDir(tableDir) - require.Nil(t, err) - require.Len(t, files, 3) - fileNames = nil - for _, f := range files { - fileNames = append(fileNames, f.Name()) - } - require.ElementsMatch(t, []string{"CDC000001.json", "CDC000002.json", "CDC.index"}, fileNames) - content, err = os.ReadFile(path.Join(tableDir, "CDC000002.json")) + fileNames = getTableFiles(t, tableDir) + require.Len(t, fileNames, 3) + require.ElementsMatch(t, []string{"CDC000001.csv", "CDC000002.csv", "CDC.index"}, fileNames) + content, err = os.ReadFile(path.Join(tableDir, "CDC000002.csv")) require.Nil(t, err) require.Greater(t, len(content), 0) - content, err = os.ReadFile(path.Join(tableDir, "CDC.index")) + content, err = os.ReadFile(path.Join(tableDir, "meta/CDC.index")) require.Nil(t, err) - require.Equal(t, "CDC000002.json\n", string(content)) + require.Equal(t, "CDC000002.csv\n", string(content)) require.Equal(t, uint64(1000), atomic.LoadUint64(&cnt)) cancel() diff --git a/cdc/sinkv2/eventsink/cloudstorage/dml_worker.go b/cdc/sinkv2/eventsink/cloudstorage/dml_worker.go index a41e9ca7dc2..ccc93523d72 100644 --- a/cdc/sinkv2/eventsink/cloudstorage/dml_worker.go +++ b/cdc/sinkv2/eventsink/cloudstorage/dml_worker.go @@ -15,7 +15,6 @@ package cloudstorage import ( "bytes" "context" - "encoding/json" "path" "sync" "sync/atomic" @@ -152,7 +151,7 @@ func (d *dmlWorker) flushMessages(ctx context.Context) error { } // generate scheme.json file before generating the first data file if necessary - err := d.writeSchemaFile(ctx, table, tbl.tableInfo) + err := d.filePathGenerator.CheckOrWriteSchema(ctx, table, tbl.tableInfo) if err != nil { log.Error("failed to write schema file to external storage", zap.Int("workerID", d.id), @@ -218,42 +217,6 @@ func (d *dmlWorker) flushMessages(ctx context.Context) error { } } -// In order to avoid spending so much time lookuping directory and getting last write point -// (i.e. which dir and which file) when the changefeed is restarted, we'd rather switch to -// a new dir and start writing. In this case, schema file should be created in the new dir -// if it hasn't been created when a DDL event was executed. -func (d *dmlWorker) writeSchemaFile( - ctx context.Context, - table cloudstorage.VersionedTableName, - tableInfo *model.TableInfo, -) error { - if ok := d.filePathGenerator.Contains(table); !ok { - var tableDetail cloudstorage.TableDefinition - tableDetail.FromTableInfo(tableInfo, table.TableInfoVersion) - path := cloudstorage.GenerateSchemaFilePath(tableDetail) - // the file may have been created when a DDL event was executed. - exist, err := d.storage.FileExists(ctx, path) - if err != nil { - return err - } - if exist { - return nil - } - - encodedDetail, err := json.MarshalIndent(tableDetail, "", " ") - if err != nil { - return err - } - - err = d.storage.WriteFile(ctx, path, encodedDetail) - if err != nil { - return err - } - } - - return nil -} - func (d *dmlWorker) writeIndexFile(ctx context.Context, path, content string) error { err := d.storage.WriteFile(ctx, path, []byte(content)) return err diff --git a/cdc/sinkv2/eventsink/cloudstorage/dml_worker_test.go b/cdc/sinkv2/eventsink/cloudstorage/dml_worker_test.go index 64d01de6ff6..d809e7960db 100644 --- a/cdc/sinkv2/eventsink/cloudstorage/dml_worker_test.go +++ b/cdc/sinkv2/eventsink/cloudstorage/dml_worker_test.go @@ -128,12 +128,12 @@ func TestDMLWorkerRun(t *testing.T) { // check whether files for table1 has been generated files, err := os.ReadDir(table1Dir) require.Nil(t, err) - require.Len(t, files, 3) + require.Len(t, files, 2) var fileNames []string for _, f := range files { fileNames = append(fileNames, f.Name()) } - require.ElementsMatch(t, []string{"CDC000001.json", "schema.json", "CDC.index"}, fileNames) + require.ElementsMatch(t, []string{"CDC000001.json", "CDC.index"}, fileNames) cancel() d.close() wg.Wait() diff --git a/cmd/storage-consumer/main.go b/cmd/storage-consumer/main.go index 75657f21891..2554ed48b03 100644 --- a/cmd/storage-consumer/main.go +++ b/cmd/storage-consumer/main.go @@ -22,9 +22,7 @@ import ( "net/url" "os" "os/signal" - "regexp" "sort" - "strconv" "strings" "sync" "syscall" @@ -106,122 +104,6 @@ func init() { } } -type schemaPathKey struct { - schema string - table string - version int64 -} - -func (s schemaPathKey) generagteSchemaFilePath() string { - return fmt.Sprintf("%s/%s/%d/schema.json", s.schema, s.table, s.version) -} - -func (s *schemaPathKey) parseSchemaFilePath(path string) error { - str := `(\w+)\/(\w+)\/(\d+)\/schema.json` - pathRE, err := regexp.Compile(str) - if err != nil { - return err - } - - matches := pathRE.FindStringSubmatch(path) - if len(matches) != 4 { - return fmt.Errorf("cannot match schema path pattern for %s", path) - } - - version, err := strconv.ParseUint(matches[3], 10, 64) - if err != nil { - return err - } - - *s = schemaPathKey{ - schema: matches[1], - table: matches[2], - version: int64(version), - } - return nil -} - -type dmlPathKey struct { - schemaPathKey - partitionNum int64 - date string -} - -func (d *dmlPathKey) generateDMLFilePath(idx uint64, extension string) string { - var elems []string - - elems = append(elems, d.schema) - elems = append(elems, d.table) - elems = append(elems, fmt.Sprintf("%d", d.version)) - - if d.partitionNum != 0 { - elems = append(elems, fmt.Sprintf("%d", d.partitionNum)) - } - if len(d.date) != 0 { - elems = append(elems, d.date) - } - elems = append(elems, fmt.Sprintf("CDC%06d%s", idx, extension)) - - return strings.Join(elems, "/") -} - -// dml file path pattern is as follows: -// {schema}/{table}/{table-version-separator}/{partition-separator}/{date-separator}/CDC{num}.extension -// in this pattern, partition-separator and date-separator could be empty. -func (d *dmlPathKey) parseDMLFilePath(dateSeparator, path string) (uint64, error) { - var partitionNum int64 - - str := `(\w+)\/(\w+)\/(\d+)\/(\d+)?\/*` - switch dateSeparator { - case config.DateSeparatorNone.String(): - str += `(\d{4})*` - case config.DateSeparatorYear.String(): - str += `(\d{4})\/` - case config.DateSeparatorMonth.String(): - str += `(\d{4}-\d{2})\/` - case config.DateSeparatorDay.String(): - str += `(\d{4}-\d{2}-\d{2})\/` - } - str += `CDC(\d+).\w+` - pathRE, err := regexp.Compile(str) - if err != nil { - return 0, err - } - - matches := pathRE.FindStringSubmatch(path) - if len(matches) != 7 { - return 0, fmt.Errorf("cannot match dml path pattern for %s", path) - } - - version, err := strconv.ParseInt(matches[3], 10, 64) - if err != nil { - return 0, err - } - - if len(matches[4]) > 0 { - partitionNum, err = strconv.ParseInt(matches[4], 10, 64) - if err != nil { - return 0, err - } - } - fileIdx, err := strconv.ParseUint(strings.TrimLeft(matches[6], "0"), 10, 64) - if err != nil { - return 0, err - } - - *d = dmlPathKey{ - schemaPathKey: schemaPathKey{ - schema: matches[1], - table: matches[2], - version: version, - }, - partitionNum: partitionNum, - date: matches[5], - } - - return fileIdx, nil -} - // fileIndexRange defines a range of files. eg. CDC000002.csv ~ CDC000005.csv type fileIndexRange struct { start uint64 @@ -236,9 +118,11 @@ type consumer struct { externalStorage storage.ExternalStorage fileExtension string // tableDMLIdxMap maintains a map of - tableDMLIdxMap map[dmlPathKey]uint64 + tableDMLIdxMap map[cloudstorage.DmlPathKey]uint64 // tableTsMap maintains a map of tableTsMap map[model.TableID]model.ResolvedTs + // tableDefMap maintains a map of <`schema`.`table`, tableDef slice sorted by TableVersion> + tableDefMap map[string]map[uint64]*cloudstorage.TableDefinition // tableSinkMap maintains a map of tableSinkMap map[model.TableID]tablesink.TableSink tableIDGenerator *fakeTableIDGenerator @@ -321,8 +205,9 @@ func newConsumer(ctx context.Context) (*consumer, error) { externalStorage: storage, fileExtension: extension, errCh: errCh, - tableDMLIdxMap: make(map[dmlPathKey]uint64), + tableDMLIdxMap: make(map[cloudstorage.DmlPathKey]uint64), tableTsMap: make(map[model.TableID]model.ResolvedTs), + tableDefMap: make(map[string]map[uint64]*cloudstorage.TableDefinition), tableSinkMap: make(map[model.TableID]tablesink.TableSink), tableIDGenerator: &fakeTableIDGenerator{ tableIDs: make(map[string]int64), @@ -331,8 +216,10 @@ func newConsumer(ctx context.Context) (*consumer, error) { } // map1 - map2 -func diffDMLMaps(map1, map2 map[dmlPathKey]uint64) map[dmlPathKey]fileIndexRange { - resMap := make(map[dmlPathKey]fileIndexRange) +func diffDMLMaps( + map1, map2 map[cloudstorage.DmlPathKey]uint64, +) map[cloudstorage.DmlPathKey]fileIndexRange { + resMap := make(map[cloudstorage.DmlPathKey]fileIndexRange) for k, v := range map1 { if _, ok := map2[k]; !ok { resMap[k] = fileIndexRange{ @@ -351,48 +238,27 @@ func diffDMLMaps(map1, map2 map[dmlPathKey]uint64) map[dmlPathKey]fileIndexRange } // getNewFiles returns newly created dml files in specific ranges -func (c *consumer) getNewFiles(ctx context.Context) (map[dmlPathKey]fileIndexRange, error) { - tableDMLMap := make(map[dmlPathKey]fileIndexRange) +func (c *consumer) getNewFiles( + ctx context.Context, +) (map[cloudstorage.DmlPathKey]fileIndexRange, error) { + tableDMLMap := make(map[cloudstorage.DmlPathKey]fileIndexRange) opt := &storage.WalkOption{SubDir: ""} - origDMLIdxMap := make(map[dmlPathKey]uint64, len(c.tableDMLIdxMap)) + origDMLIdxMap := make(map[cloudstorage.DmlPathKey]uint64, len(c.tableDMLIdxMap)) for k, v := range c.tableDMLIdxMap { origDMLIdxMap[k] = v } err := c.externalStorage.WalkDir(ctx, opt, func(path string, size int64) error { - var dmlkey dmlPathKey - var schemaKey schemaPathKey - var fileIdx uint64 - var err error - - if strings.HasSuffix(path, "schema.json") { - err = schemaKey.parseSchemaFilePath(path) + if cloudstorage.IsSchemaFile(path) { + err := c.parseSchemaFilePath(ctx, path) if err != nil { log.Error("failed to parse schema file path", zap.Error(err)) // skip handling this file return nil } - // fake a dml key for schema.json file, which is useful for putting DDL - // in front of the DML files when sorting. - // e.g, for the partitioned table: - // - // test/test1/439972354120482843/schema.json (partitionNum = -1) - // test/test1/439972354120482843/55/2023-03-09/CDC000001.csv (partitionNum = 55) - // test/test1/439972354120482843/66/2023-03-09/CDC000001.csv (partitionNum = 66) - // - // and for the non-partitioned table: - // test/test2/439972354120482843/schema.json (partitionNum = -1) - // test/test2/439972354120482843/2023-03-09/CDC000001.csv (partitionNum = 0) - // test/test2/439972354120482843/2023-03-09/CDC000002.csv (partitionNum = 0) - // - // the DDL event recorded in schema.json should be executed first, then the DML events - // in csv files can be executed. - dmlkey.schemaPathKey = schemaKey - dmlkey.partitionNum = fakePartitionNumForSchemaFile - dmlkey.date = "" } else if strings.HasSuffix(path, c.fileExtension) { - fileIdx, err = dmlkey.parseDMLFilePath(c.replicationCfg.Sink.DateSeparator, path) + err := c.parseDMLFilePath(ctx, path) if err != nil { log.Error("failed to parse dml file path", zap.Error(err)) // skip handling this file @@ -400,13 +266,7 @@ func (c *consumer) getNewFiles(ctx context.Context) (map[dmlPathKey]fileIndexRan } } else { log.Debug("ignore handling file", zap.String("path", path)) - return nil } - - if _, ok := c.tableDMLIdxMap[dmlkey]; !ok || fileIdx >= c.tableDMLIdxMap[dmlkey] { - c.tableDMLIdxMap[dmlkey] = fileIdx - } - return nil }) if err != nil { @@ -421,7 +281,7 @@ func (c *consumer) getNewFiles(ctx context.Context) (map[dmlPathKey]fileIndexRan func (c *consumer) emitDMLEvents( ctx context.Context, tableID int64, tableDetail cloudstorage.TableDefinition, - pathKey dmlPathKey, + pathKey cloudstorage.DmlPathKey, content []byte, ) error { var ( @@ -493,9 +353,9 @@ func (c *consumer) emitDMLEvents( filteredCnt++ } } - log.Info("decode success", zap.String("schema", pathKey.schema), - zap.String("table", pathKey.table), - zap.Int64("version", pathKey.version), + log.Info("decode success", zap.String("schema", pathKey.Schema), + zap.String("table", pathKey.Table), + zap.Uint64("version", pathKey.TableVersion), zap.Int("decodeRowsCnt", cnt), zap.Int("filteredRowsCnt", filteredCnt)) @@ -529,17 +389,17 @@ func (c *consumer) waitTableFlushComplete(ctx context.Context, tableID model.Tab func (c *consumer) syncExecDMLEvents( ctx context.Context, tableDef cloudstorage.TableDefinition, - key dmlPathKey, + key cloudstorage.DmlPathKey, fileIdx uint64, ) error { - filePath := key.generateDMLFilePath(fileIdx, c.fileExtension) + filePath := key.GenerateDMLFilePath(fileIdx, c.fileExtension) log.Debug("read from dml file path", zap.String("path", filePath)) content, err := c.externalStorage.ReadFile(ctx, filePath) if err != nil { return errors.Trace(err) } tableID := c.tableIDGenerator.generateFakeTableID( - key.schema, key.table, key.partitionNum) + key.Schema, key.Table, key.PartitionNum) err = c.emitDMLEvents(ctx, tableID, tableDef, key, content) if err != nil { return errors.Trace(err) @@ -558,31 +418,108 @@ func (c *consumer) syncExecDMLEvents( return nil } -func (c *consumer) getTableDefFromFile( - ctx context.Context, - schemaKey schemaPathKey, -) (cloudstorage.TableDefinition, error) { - var tableDef cloudstorage.TableDefinition +func (c *consumer) parseDMLFilePath(_ context.Context, path string) error { + var dmlkey cloudstorage.DmlPathKey + fileIdx, err := dmlkey.ParseDMLFilePath(c.replicationCfg.Sink.DateSeparator, path) + if err != nil { + return errors.Trace(err) + } - schemaFilePath := schemaKey.generagteSchemaFilePath() - schemaContent, err := c.externalStorage.ReadFile(ctx, schemaFilePath) + if _, ok := c.tableDMLIdxMap[dmlkey]; !ok || fileIdx >= c.tableDMLIdxMap[dmlkey] { + c.tableDMLIdxMap[dmlkey] = fileIdx + } + return nil +} + +func (c *consumer) parseSchemaFilePath(ctx context.Context, path string) error { + var schemaKey cloudstorage.SchemaPathKey + checksumInFile, err := schemaKey.ParseSchemaFilePath(path) if err != nil { - return tableDef, errors.Trace(err) + return errors.Trace(err) + } + key := schemaKey.GetKey() + if tableDefs, ok := c.tableDefMap[key]; ok { + if _, ok := tableDefs[schemaKey.TableVersion]; ok { + // Skip if tableDef already exists. + return nil + } + } else { + c.tableDefMap[key] = make(map[uint64]*cloudstorage.TableDefinition) } + // Read tableDef from schema file and check checksum. + var tableDef cloudstorage.TableDefinition + schemaContent, err := c.externalStorage.ReadFile(ctx, path) + if err != nil { + return errors.Trace(err) + } err = json.Unmarshal(schemaContent, &tableDef) if err != nil { - return tableDef, errors.Trace(err) + return errors.Trace(err) } + checksumInMem, err := tableDef.Sum32(nil) + if err != nil { + return errors.Trace(err) + } + if checksumInMem != checksumInFile || schemaKey.TableVersion != tableDef.TableVersion { + log.Panic("checksum mismatch", + zap.Uint32("checksumInMem", checksumInMem), + zap.Uint32("checksumInFile", checksumInFile), + zap.Uint64("tableversionInMem", schemaKey.TableVersion), + zap.Uint64("tableversionInFile", tableDef.TableVersion), + zap.String("path", path)) + } + + // Update tableDefMap. + c.tableDefMap[key][tableDef.TableVersion] = &tableDef + + // Fake a dml key for schema.json file, which is useful for putting DDL + // in front of the DML files when sorting. + // e.g, for the partitioned table: + // + // test/test1/439972354120482843/schema.json (partitionNum = -1) + // test/test1/439972354120482843/55/2023-03-09/CDC000001.csv (partitionNum = 55) + // test/test1/439972354120482843/66/2023-03-09/CDC000001.csv (partitionNum = 66) + // + // and for the non-partitioned table: + // test/test2/439972354120482843/schema.json (partitionNum = -1) + // test/test2/439972354120482843/2023-03-09/CDC000001.csv (partitionNum = 0) + // test/test2/439972354120482843/2023-03-09/CDC000002.csv (partitionNum = 0) + // + // the DDL event recorded in schema.json should be executed first, then the DML events + // in csv files can be executed. + dmlkey := cloudstorage.DmlPathKey{ + SchemaPathKey: schemaKey, + PartitionNum: fakePartitionNumForSchemaFile, + Date: "", + } + if _, ok := c.tableDMLIdxMap[dmlkey]; !ok { + c.tableDMLIdxMap[dmlkey] = 0 + } else { + // duplicate table schema file found, this should not happen. + log.Panic("duplicate schema file found", + zap.String("path", path), zap.Any("tableDef", tableDef), + zap.Any("schemaKey", schemaKey), zap.Any("dmlkey", dmlkey)) + } + return nil +} - return tableDef, nil +func (c *consumer) mustGetTableDef(key cloudstorage.SchemaPathKey) cloudstorage.TableDefinition { + var tableDef *cloudstorage.TableDefinition + if tableDefs, ok := c.tableDefMap[key.GetKey()]; ok { + tableDef = tableDefs[key.TableVersion] + } + if tableDef == nil { + log.Panic("tableDef not found", zap.Any("key", key), zap.Any("tableDefMap", c.tableDefMap)) + } + return *tableDef } func (c *consumer) handleNewFiles( ctx context.Context, - dmlFileMap map[dmlPathKey]fileIndexRange, + dmlFileMap map[cloudstorage.DmlPathKey]fileIndexRange, ) error { - keys := make([]dmlPathKey, 0, len(dmlFileMap)) + keys := make([]cloudstorage.DmlPathKey, 0, len(dmlFileMap)) for k := range dmlFileMap { keys = append(keys, k) } @@ -591,30 +528,27 @@ func (c *consumer) handleNewFiles( return nil } sort.Slice(keys, func(i, j int) bool { - if keys[i].version != keys[j].version { - return keys[i].version < keys[j].version + if keys[i].TableVersion != keys[j].TableVersion { + return keys[i].TableVersion < keys[j].TableVersion } - if keys[i].partitionNum != keys[j].partitionNum { - return keys[i].partitionNum < keys[j].partitionNum + if keys[i].PartitionNum != keys[j].PartitionNum { + return keys[i].PartitionNum < keys[j].PartitionNum } - if keys[i].date != keys[j].date { - return keys[i].date < keys[j].date + if keys[i].Date != keys[j].Date { + return keys[i].Date < keys[j].Date } - if keys[i].schema != keys[j].schema { - return keys[i].schema < keys[j].schema + if keys[i].Schema != keys[j].Schema { + return keys[i].Schema < keys[j].Schema } - return keys[i].table < keys[j].table + return keys[i].Table < keys[j].Table }) for _, key := range keys { - tableDef, err := c.getTableDefFromFile(ctx, key.schemaPathKey) - if err != nil { - return err - } + tableDef := c.mustGetTableDef(key.SchemaPathKey) // if the key is a fake dml path key which is mainly used for // sorting schema.json file before the dml files, then execute the ddl query. - if key.partitionNum == fakePartitionNumForSchemaFile && - len(key.date) == 0 && len(tableDef.Query) > 0 { + if key.PartitionNum == fakePartitionNumForSchemaFile && + len(key.Date) == 0 && len(tableDef.Query) > 0 { ddlEvent, err := tableDef.ToDDLEvent() if err != nil { return err @@ -622,6 +556,7 @@ func (c *consumer) handleNewFiles( if err := c.ddlSink.WriteDDLEvent(ctx, ddlEvent); err != nil { return errors.Trace(err) } + // TODO: need to cleanup tableDefMap in the future. log.Info("execute ddl event successfully", zap.String("query", tableDef.Query)) continue } diff --git a/pkg/sink/cloudstorage/main_test.go b/pkg/sink/cloudstorage/main_test.go new file mode 100644 index 00000000000..6785071a688 --- /dev/null +++ b/pkg/sink/cloudstorage/main_test.go @@ -0,0 +1,24 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package cloudstorage + +import ( + "testing" + + "github.com/pingcap/tiflow/pkg/leakutil" +) + +func TestMain(m *testing.M) { + leakutil.SetUpLeakTest(m) +} diff --git a/pkg/sink/cloudstorage/path.go b/pkg/sink/cloudstorage/path.go index bd150c0847a..ed5100202a0 100644 --- a/pkg/sink/cloudstorage/path.go +++ b/pkg/sink/cloudstorage/path.go @@ -17,14 +17,18 @@ import ( "context" "fmt" "io" + "regexp" "strconv" "strings" + "github.com/pingcap/log" "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/engine/pkg/clock" "github.com/pingcap/tiflow/pkg/config" - cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/hash" + "go.uber.org/zap" ) const ( @@ -32,11 +36,67 @@ const ( // at least 6 digits (e.g. CDC000001.csv). minFileNamePrefixLen = 9 defaultIndexFileName = "CDC.index" + + // The following constants are used to generate file paths. + schemaFileNameFormat = "schema_%d_%010d.json" + // The database schema is stored in the following path: + // /meta/schema_{tableVersion}_{checksum}.json + dbSchemaPrefix = "%s/meta/" + // The table schema is stored in the following path: + // //meta/schema_{tableVersion}_{checksum}.json + tableSchemaPrefix = "%s/%s/meta/" ) -// GenerateSchemaFilePath generates schema file path based on the table definition. -func GenerateSchemaFilePath(def TableDefinition) string { - return fmt.Sprintf("%s/%s/%d/schema.json", def.Schema, def.Table, def.TableVersion) +var schemaRE = regexp.MustCompile(`meta/schema_\d+_\d{10}\.json$`) + +// IsSchemaFile checks whether the file is a schema file. +func IsSchemaFile(path string) bool { + return schemaRE.MatchString(path) +} + +// MustParseSchemaName parses the version from the schema file name. +func MustParseSchemaName(path string) (uint64, uint32) { + reportErr := func(err error) { + log.Panic("failed to parse schema file name", + zap.String("schemaPath", path), + zap.Any("error", err)) + } + + // For /
/meta/schema_{tableVersion}_{checksum}.json, the parts + // should be ["/
/meta/schema", "{tableVersion}", "{checksum}.json"]. + parts := strings.Split(path, "_") + if len(parts) < 3 { + reportErr(errors.New("invalid path format")) + } + + checksum := strings.TrimSuffix(parts[len(parts)-1], ".json") + tableChecksum, err := strconv.ParseUint(checksum, 10, 64) + if err != nil { + reportErr(err) + } + version := parts[len(parts)-2] + tableVersion, err := strconv.ParseUint(version, 10, 64) + if err != nil { + reportErr(err) + } + return tableVersion, uint32(tableChecksum) +} + +func generateSchemaFilePath( + schema, table string, tableVersion uint64, checksum uint32, +) string { + if schema == "" || tableVersion == 0 { + log.Panic("invalid schema or tableVersion", + zap.String("schema", schema), zap.Uint64("tableVersion", tableVersion)) + } + if table == "" { + // Generate db schema file path. + return fmt.Sprintf(dbSchemaPrefix+schemaFileNameFormat, + schema, tableVersion, checksum) + } + // Generate table schema file path. + return fmt.Sprintf(tableSchemaPrefix+schemaFileNameFormat, + schema, table, tableVersion, checksum) } type indexWithDate struct { @@ -63,6 +123,9 @@ type FilePathGenerator struct { clock clock.Clock storage storage.ExternalStorage fileIndex map[VersionedTableName]*indexWithDate + + hasher *hash.PositionInertia + versionMap map[VersionedTableName]uint64 } // NewFilePathGenerator creates a FilePathGenerator. @@ -73,12 +136,98 @@ func NewFilePathGenerator( clock clock.Clock, ) *FilePathGenerator { return &FilePathGenerator{ - config: config, - extension: extension, - storage: storage, - clock: clock, - fileIndex: make(map[VersionedTableName]*indexWithDate), + config: config, + extension: extension, + storage: storage, + clock: clock, + fileIndex: make(map[VersionedTableName]*indexWithDate), + hasher: hash.NewPositionInertia(), + versionMap: make(map[VersionedTableName]uint64), + } +} + +// CheckOrWriteSchema checks whether the schema file exists in the storage and +// write scheme.json if necessary. +func (f *FilePathGenerator) CheckOrWriteSchema( + ctx context.Context, + table VersionedTableName, + tableInfo *model.TableInfo, +) error { + if _, ok := f.versionMap[table]; ok { + return nil + } + + var def TableDefinition + def.FromTableInfo(tableInfo, table.TableInfoVersion) + if !def.IsTableSchema() { + // only check schema for table + log.Panic("invalid table schema", zap.Any("versionedTableName", table), + zap.Any("tableInfo", tableInfo)) + } + + // Case 1: point check if the schema file exists. + tblSchemaFile, err := def.GenerateSchemaFilePath() + if err != nil { + return err + } + exist, err := f.storage.FileExists(ctx, tblSchemaFile) + if err != nil { + return err + } + if exist { + f.versionMap[table] = table.TableInfoVersion + return nil + } + + // walk the table meta path to find the last schema file + _, checksum := MustParseSchemaName(tblSchemaFile) + schemaFileCnt := 0 + lastVersion := uint64(0) + prefix := fmt.Sprintf(tableSchemaPrefix+"schema_", def.Schema, def.Table) + checksumSuffix := fmt.Sprintf("%010d.json", checksum) + err = f.storage.WalkDir(ctx, &storage.WalkOption{ObjPrefix: prefix}, + func(path string, _ int64) error { + schemaFileCnt++ + if !strings.HasSuffix(path, checksumSuffix) { + return nil + } + version, parsedChecksum := MustParseSchemaName(path) + if parsedChecksum != checksum { + // TODO: parsedChecksum should be ignored, remove this panic + // after the new path protocol is verified. + log.Panic("invalid schema file name", + zap.String("path", path), zap.Any("checksum", checksum)) + } + if version > lastVersion { + lastVersion = version + } + return nil + }, + ) + if err != nil { + return err + } + + // Case 2: the table meta path is not empty. + if schemaFileCnt != 0 { + if lastVersion == 0 { + log.Panic("no table schema file found in an non-empty meta path", + zap.Any("versionedTableName", table), + zap.Uint32("checksum", checksum)) + } + f.versionMap[table] = lastVersion + return nil } + + // Case 3: the table meta path is empty, which only happens when the table is + // existed before changefeed started. We need to write schema file to external + // storage. + encodedDetail, err := def.MarshalWithQuery() + if err != nil { + return err + } + f.versionMap[table] = table.TableInfoVersion + return f.storage.WriteFile(ctx, tblSchemaFile, encodedDetail) } // SetClock is used for unit test @@ -86,12 +235,6 @@ func (f *FilePathGenerator) SetClock(clock clock.Clock) { f.clock = clock } -// Contains checks if a VersionedTable is cached by FilePathGenerator before. -func (f *FilePathGenerator) Contains(tbl VersionedTableName) bool { - _, ok := f.fileIndex[tbl] - return ok -} - // GenerateDateStr generates a date string base on current time // and the date-separator configuration item. func (f *FilePathGenerator) GenerateDateStr() string { @@ -116,7 +259,7 @@ func (f *FilePathGenerator) generateDataDirPath(tbl VersionedTableName, date str elems = append(elems, tbl.TableNameWithPhysicTableID.Schema) elems = append(elems, tbl.TableNameWithPhysicTableID.Table) - elems = append(elems, fmt.Sprintf("%d", tbl.TableInfoVersion)) + elems = append(elems, fmt.Sprintf("%d", f.versionMap[tbl])) if f.config.EnablePartitionSeparator && tbl.TableNameWithPhysicTableID.IsPartition { elems = append(elems, fmt.Sprintf("%d", tbl.TableNameWithPhysicTableID.TableID)) @@ -136,14 +279,14 @@ func (f *FilePathGenerator) fetchIndexFromFileName(fileName string) (uint64, err if len(fileName) < minFileNamePrefixLen+len(f.extension) || !strings.HasPrefix(fileName, "CDC") || !strings.HasSuffix(fileName, f.extension) { - return 0, cerror.WrapError(cerror.ErrStorageSinkInvalidFileName, + return 0, errors.WrapError(errors.ErrStorageSinkInvalidFileName, fmt.Errorf("'%s' is a invalid file name", fileName)) } extIdx := strings.Index(fileName, f.extension) fileIdxStr := fileName[3:extIdx] if fileIdx, err = strconv.ParseUint(fileIdxStr, 10, 64); err != nil { - return 0, cerror.WrapError(cerror.ErrStorageSinkInvalidFileName, err) + return 0, errors.WrapError(errors.ErrStorageSinkInvalidFileName, err) } return fileIdx, nil diff --git a/pkg/sink/cloudstorage/path_key.go b/pkg/sink/cloudstorage/path_key.go new file mode 100644 index 00000000000..f8575c8cde6 --- /dev/null +++ b/pkg/sink/cloudstorage/path_key.go @@ -0,0 +1,154 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package cloudstorage + +import ( + "fmt" + "regexp" + "strconv" + "strings" + + "github.com/pingcap/tiflow/pkg/config" + "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/quotes" +) + +// SchemaPathKey is the key of schema path. +type SchemaPathKey struct { + Schema string + Table string + TableVersion uint64 +} + +// GetKey returns the key of schema path. +func (s *SchemaPathKey) GetKey() string { + return quotes.QuoteSchema(s.Schema, s.Table) +} + +// ParseSchemaFilePath parses the schema file path and returns the table version and checksum. +func (s *SchemaPathKey) ParseSchemaFilePath(path string) (uint32, error) { + // For /
/meta/schema_{tableVersion}_{checksum}.json, the parts + // should be ["", "
", "meta", "schema_{tableVersion}_{checksum}.json"]. + matches := strings.Split(path, "/") + + var schema, table string + schema = matches[0] + switch len(matches) { + case 3: + table = "" + case 4: + table = matches[1] + default: + return 0, errors.Trace(fmt.Errorf("cannot match schema path pattern for %s", path)) + } + + if matches[len(matches)-2] != "meta" { + return 0, errors.Trace(fmt.Errorf("cannot match schema path pattern for %s", path)) + } + + schemaFileName := matches[len(matches)-1] + version, checksum := MustParseSchemaName(schemaFileName) + + *s = SchemaPathKey{ + Schema: schema, + Table: table, + TableVersion: version, + } + return checksum, nil +} + +// DmlPathKey is the key of dml path. +type DmlPathKey struct { + SchemaPathKey + PartitionNum int64 + Date string +} + +// GenerateDMLFilePath generates the dml file path. +func (d *DmlPathKey) GenerateDMLFilePath(idx uint64, extension string) string { + var elems []string + + elems = append(elems, d.Schema) + elems = append(elems, d.Table) + elems = append(elems, fmt.Sprintf("%d", d.TableVersion)) + + if d.PartitionNum != 0 { + elems = append(elems, fmt.Sprintf("%d", d.PartitionNum)) + } + if len(d.Date) != 0 { + elems = append(elems, d.Date) + } + elems = append(elems, fmt.Sprintf("CDC%06d%s", idx, extension)) + + return strings.Join(elems, "/") +} + +// ParseDMLFilePath parses the dml file path and returns the max file index. +// DML file path pattern is as follows: +// {schema}/{table}/{table-version-separator}/{partition-separator}/{date-separator}/, where +// partition-separator and date-separator could be empty. +// DML file name pattern is as follows: CDC{num}.extension. +func (d *DmlPathKey) ParseDMLFilePath(dateSeparator, path string) (uint64, error) { + var partitionNum int64 + + str := `(\w+)\/(\w+)\/(\d+)\/(\d+)?\/*` + switch dateSeparator { + case config.DateSeparatorNone.String(): + str += `(\d{4})*` + case config.DateSeparatorYear.String(): + str += `(\d{4})\/` + case config.DateSeparatorMonth.String(): + str += `(\d{4}-\d{2})\/` + case config.DateSeparatorDay.String(): + str += `(\d{4}-\d{2}-\d{2})\/` + } + str += `CDC(\d+).\w+` + pathRE, err := regexp.Compile(str) + if err != nil { + return 0, err + } + + matches := pathRE.FindStringSubmatch(path) + if len(matches) != 7 { + return 0, fmt.Errorf("cannot match dml path pattern for %s", path) + } + + version, err := strconv.ParseUint(matches[3], 10, 64) + if err != nil { + return 0, err + } + + if len(matches[4]) > 0 { + partitionNum, err = strconv.ParseInt(matches[4], 10, 64) + if err != nil { + return 0, err + } + } + fileIdx, err := strconv.ParseUint(strings.TrimLeft(matches[6], "0"), 10, 64) + if err != nil { + return 0, err + } + + *d = DmlPathKey{ + SchemaPathKey: SchemaPathKey{ + Schema: matches[1], + Table: matches[2], + TableVersion: version, + }, + PartitionNum: partitionNum, + Date: matches[5], + } + + return fileIdx, nil +} diff --git a/pkg/sink/cloudstorage/path_key_test.go b/pkg/sink/cloudstorage/path_key_test.go new file mode 100644 index 00000000000..da6bb55cecc --- /dev/null +++ b/pkg/sink/cloudstorage/path_key_test.go @@ -0,0 +1,58 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package cloudstorage + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestSchemaPathKey(t *testing.T) { + t.Parallel() + + testCases := []struct { + path string + schemakey SchemaPathKey + checksum uint32 + }{ + // Test for database schema path: /meta/schema_{tableVersion}_{checksum}.json + { + path: "test_schema/meta/schema_1_2.json", + schemakey: SchemaPathKey{ + Schema: "test_schema", + Table: "", + TableVersion: 1, + }, + checksum: 2, + }, + // Test for table schema path: /
/meta/schema_{tableVersion}_{checksum}.json + { + path: "test_schema/test_table/meta/schema_11_22.json", + schemakey: SchemaPathKey{ + Schema: "test_schema", + Table: "test_table", + TableVersion: 11, + }, + checksum: 22, + }, + } + for _, tc := range testCases { + var schemaKey SchemaPathKey + checksum, err := schemaKey.ParseSchemaFilePath(tc.path) + require.NoError(t, err) + require.Equal(t, tc.schemakey, schemaKey) + require.Equal(t, tc.checksum, checksum) + } +} diff --git a/pkg/sink/cloudstorage/path_test.go b/pkg/sink/cloudstorage/path_test.go index 9f57e153e27..72076fa6284 100644 --- a/pkg/sink/cloudstorage/path_test.go +++ b/pkg/sink/cloudstorage/path_test.go @@ -45,6 +45,8 @@ func testFilePathGenerator(ctx context.Context, t *testing.T, dir string) *FileP } func TestGenerateDataFilePath(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithCancel(context.TODO()) defer cancel() @@ -58,6 +60,7 @@ func TestGenerateDataFilePath(t *testing.T) { dir := t.TempDir() f := testFilePathGenerator(ctx, t, dir) + f.versionMap[table] = table.TableInfoVersion date := f.GenerateDateStr() // date-separator: none path, err := f.GenerateDataFilePath(ctx, table, date) @@ -70,6 +73,7 @@ func TestGenerateDataFilePath(t *testing.T) { // date-separator: year mockClock := clock.NewMock() f = testFilePathGenerator(ctx, t, dir) + f.versionMap[table] = table.TableInfoVersion f.config.DateSeparator = config.DateSeparatorYear.String() f.clock = mockClock mockClock.Set(time.Date(2022, 12, 31, 23, 59, 59, 0, time.UTC)) @@ -93,6 +97,7 @@ func TestGenerateDataFilePath(t *testing.T) { // date-separator: month mockClock = clock.NewMock() f = testFilePathGenerator(ctx, t, dir) + f.versionMap[table] = table.TableInfoVersion f.config.DateSeparator = config.DateSeparatorMonth.String() f.clock = mockClock mockClock.Set(time.Date(2022, 12, 31, 23, 59, 59, 0, time.UTC)) @@ -116,6 +121,7 @@ func TestGenerateDataFilePath(t *testing.T) { // date-separator: day mockClock = clock.NewMock() f = testFilePathGenerator(ctx, t, dir) + f.versionMap[table] = table.TableInfoVersion f.config.DateSeparator = config.DateSeparatorDay.String() f.clock = mockClock mockClock.Set(time.Date(2022, 12, 31, 23, 59, 59, 0, time.UTC)) @@ -138,6 +144,8 @@ func TestGenerateDataFilePath(t *testing.T) { } func TestFetchIndexFromFileName(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithCancel(context.TODO()) defer cancel() @@ -184,6 +192,8 @@ func TestFetchIndexFromFileName(t *testing.T) { } func TestGenerateDataFilePathWithIndexFile(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithCancel(context.TODO()) defer cancel() @@ -200,6 +210,7 @@ func TestGenerateDataFilePathWithIndexFile(t *testing.T) { }, TableInfoVersion: 5, } + f.versionMap[table] = table.TableInfoVersion date := f.GenerateDateStr() indexFilePath := f.GenerateIndexFilePath(table, date) err := f.storage.WriteFile(ctx, indexFilePath, []byte("CDC000005.json\n")) @@ -228,3 +239,37 @@ func TestGenerateDataFilePathWithIndexFile(t *testing.T) { require.NoError(t, err) require.Equal(t, "test/table1/5/2023-03-09/CDC000006.json", dataFilePath) } + +func TestIsSchemaFile(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + path string + expect bool + }{ + { + "valid database schema /meta/", + "schema2/meta/schema_123_0123456789.json", true, + }, + { + "valid table schema /
/meta/", + "schema1/table1/meta/schema_123_0123456789.json", true, + }, + {"valid special prefix", "meta/meta/schema_123_0123456789.json", true}, + {"valid schema1", "meta/schema_123_0123456789.json", true}, + {"missing field1", "meta/schema_012345678_.json", false}, + {"missing field2", "meta/schema_012345678.json", false}, + {"invalid checksum1", "meta/schema_123_012345678.json", false}, + {"invalid checksum2", "meta/schema_123_012a4567c9.json", false}, + {"invalid table version", "meta/schema_abc_0123456789.json", false}, + {"invalid extension1", "meta/schema_123_0123456789.txt", false}, + {"invalid extension2", "meta/schema_123_0123456789.json ", false}, + {"invalid path", "meta/schema1/schema_123_0123456789.json", false}, + } + + for _, tt := range tests { + require.Equal(t, tt.expect, IsSchemaFile(tt.path), + "testCase: %s, path: %v", tt.name, tt.path) + } +} diff --git a/pkg/sink/cloudstorage/table_definition.go b/pkg/sink/cloudstorage/table_definition.go index 40b154cc2e6..a6314ce33f4 100644 --- a/pkg/sink/cloudstorage/table_definition.go +++ b/pkg/sink/cloudstorage/table_definition.go @@ -13,20 +13,27 @@ package cloudstorage import ( + "encoding/json" + "sort" "strconv" "strings" - "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/tidb/parser/charset" timodel "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/parser/types" "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/hash" "go.uber.org/zap" ) -const defaultTableDefinitionVersion = 1 +const ( + defaultTableDefinitionVersion = 1 + marshalPrefix = "" + marshalIndent = " " +) // TableCol denotes the column info for a table definition. type TableCol struct { @@ -156,6 +163,7 @@ func (t *TableCol) ToTiColumnInfo() (*timodel.ColumnInfo, error) { } // TableDefinition is the detailed table definition used for cloud storage sink. +// TODO: find a better name for this struct. type TableDefinition struct { Table string `json:"Table"` Schema string `json:"Schema"` @@ -167,6 +175,16 @@ type TableDefinition struct { TotalColumns int `json:"TableColumnsTotal"` } +// tableDefWithoutQuery is the table definition without query, which ignores the +// Query, Type and TableVersion field. +type tableDefWithoutQuery struct { + Table string `json:"Table"` + Schema string `json:"Schema"` + Version uint64 `json:"Version"` + Columns []TableCol `json:"TableColumns"` + TotalColumns int `json:"TableColumnsTotal"` +} + // FromDDLEvent converts from DDLEvent to TableDefinition. func (t *TableDefinition) FromDDLEvent(event *model.DDLEvent) { if event.CommitTs != event.TableInfo.Version { @@ -199,8 +217,11 @@ func (t *TableDefinition) FromTableInfo(info *model.TableInfo, tableInfoVersion t.Version = defaultTableDefinitionVersion t.TableVersion = tableInfoVersion - t.Table = info.TableName.Table t.Schema = info.TableName.Schema + if info.TableInfo == nil { + return + } + t.Table = info.TableName.Table t.TotalColumns = len(info.Columns) for _, col := range info.Columns { var tableCol TableCol @@ -230,3 +251,70 @@ func (t *TableDefinition) ToTableInfo() (*model.TableInfo, error) { return info, nil } + +// IsTableSchema returns whether the TableDefinition is a table schema. +func (t *TableDefinition) IsTableSchema() bool { + if len(t.Columns) != t.TotalColumns { + log.Panic("invalid table definition", zap.Any("tableDef", t)) + } + return t.TotalColumns != 0 +} + +// MarshalWithQuery marshals TableDefinition with Query field. +func (t *TableDefinition) MarshalWithQuery() ([]byte, error) { + data, err := json.MarshalIndent(t, marshalPrefix, marshalIndent) + if err != nil { + return nil, errors.WrapError(errors.ErrMarshalFailed, err) + } + return data, nil +} + +// marshalWithoutQuery marshals TableDefinition without Query field. +func (t *TableDefinition) marshalWithoutQuery() ([]byte, error) { + // sort columns by name + sortedColumns := make([]TableCol, len(t.Columns)) + copy(sortedColumns, t.Columns) + sort.Slice(sortedColumns, func(i, j int) bool { + return sortedColumns[i].Name < sortedColumns[j].Name + }) + + defWithoutQuery := tableDefWithoutQuery{ + Table: t.Table, + Schema: t.Schema, + Columns: sortedColumns, + TotalColumns: t.TotalColumns, + } + + data, err := json.MarshalIndent(defWithoutQuery, marshalPrefix, marshalIndent) + if err != nil { + return nil, errors.WrapError(errors.ErrMarshalFailed, err) + } + return data, nil +} + +// Sum32 returns the 32-bits hash value of TableDefinition. +func (t *TableDefinition) Sum32(hasher *hash.PositionInertia) (uint32, error) { + if hasher == nil { + hasher = hash.NewPositionInertia() + } + hasher.Reset() + data, err := t.marshalWithoutQuery() + if err != nil { + return 0, err + } + + hasher.Write(data) + return hasher.Sum32(), nil +} + +// GenerateSchemaFilePath generates the schema file path for TableDefinition. +func (t *TableDefinition) GenerateSchemaFilePath() (string, error) { + checksum, err := t.Sum32(nil) + if err != nil { + return "", err + } + if !t.IsTableSchema() && t.Table != "" { + log.Panic("invalid table definition", zap.Any("tableDef", t)) + } + return generateSchemaFilePath(t.Schema, t.Table, t.TableVersion, checksum), nil +} diff --git a/pkg/sink/cloudstorage/table_definition_test.go b/pkg/sink/cloudstorage/table_definition_test.go index cc43e91837b..ccf9fb881c0 100644 --- a/pkg/sink/cloudstorage/table_definition_test.go +++ b/pkg/sink/cloudstorage/table_definition_test.go @@ -15,6 +15,7 @@ package cloudstorage import ( "encoding/json" "math" + "math/rand" "testing" "github.com/pingcap/tidb/parser/charset" @@ -25,7 +26,46 @@ import ( "github.com/stretchr/testify/require" ) +func generateTableDef() (TableDefinition, *model.TableInfo) { + var columns []*timodel.ColumnInfo + ft := types.NewFieldType(mysql.TypeLong) + ft.SetFlag(mysql.PriKeyFlag | mysql.NotNullFlag) + col := &timodel.ColumnInfo{Name: timodel.NewCIStr("Id"), FieldType: *ft} + columns = append(columns, col) + + ft = types.NewFieldType(mysql.TypeVarchar) + ft.SetFlag(mysql.NotNullFlag) + ft.SetFlen(128) + col = &timodel.ColumnInfo{Name: timodel.NewCIStr("LastName"), FieldType: *ft} + columns = append(columns, col) + + ft = types.NewFieldType(mysql.TypeVarchar) + ft.SetFlen(64) + col = &timodel.ColumnInfo{Name: timodel.NewCIStr("FirstName"), FieldType: *ft} + columns = append(columns, col) + + ft = types.NewFieldType(mysql.TypeDatetime) + col = &timodel.ColumnInfo{Name: timodel.NewCIStr("Birthday"), FieldType: *ft} + columns = append(columns, col) + + tableInfo := &model.TableInfo{ + TableInfo: &timodel.TableInfo{Columns: columns}, + Version: 100, + TableName: model.TableName{ + Schema: "test", + Table: "table1", + TableID: 20, + }, + } + + var def TableDefinition + def.FromTableInfo(tableInfo, tableInfo.Version) + return def, tableInfo +} + func TestTableCol(t *testing.T) { + t.Parallel() + testCases := []struct { name string filedType byte @@ -323,39 +363,9 @@ func TestTableCol(t *testing.T) { } func TestTableDefinition(t *testing.T) { - var columns []*timodel.ColumnInfo - var def TableDefinition + t.Parallel() - tableInfo := &model.TableInfo{ - Version: 100, - TableName: model.TableName{ - Schema: "test", - Table: "table1", - TableID: 20, - }, - } - ft := types.NewFieldType(mysql.TypeLong) - ft.SetFlag(mysql.PriKeyFlag | mysql.NotNullFlag) - col := &timodel.ColumnInfo{Name: timodel.NewCIStr("Id"), FieldType: *ft} - columns = append(columns, col) - - ft = types.NewFieldType(mysql.TypeVarchar) - ft.SetFlag(mysql.NotNullFlag) - ft.SetFlen(128) - col = &timodel.ColumnInfo{Name: timodel.NewCIStr("LastName"), FieldType: *ft} - columns = append(columns, col) - - ft = types.NewFieldType(mysql.TypeVarchar) - ft.SetFlen(64) - col = &timodel.ColumnInfo{Name: timodel.NewCIStr("FirstName"), FieldType: *ft} - columns = append(columns, col) - - ft = types.NewFieldType(mysql.TypeDatetime) - col = &timodel.ColumnInfo{Name: timodel.NewCIStr("Birthday"), FieldType: *ft} - columns = append(columns, col) - - tableInfo.TableInfo = &timodel.TableInfo{Columns: columns} - def.FromTableInfo(tableInfo, tableInfo.Version) + def, tableInfo := generateTableDef() encodedDef, err := json.MarshalIndent(def, "", " ") require.NoError(t, err) require.JSONEq(t, `{ @@ -445,3 +455,28 @@ func TestTableDefinition(t *testing.T) { require.Equal(t, timodel.ActionAddColumn, event.Type) require.Equal(t, uint64(100), event.CommitTs) } + +func TestTableDefinitionSum32(t *testing.T) { + t.Parallel() + + def, _ := generateTableDef() + checksum1, err := def.Sum32(nil) + require.NoError(t, err) + checksum2, err := def.Sum32(nil) + require.NoError(t, err) + require.Equal(t, checksum1, checksum2) + + n := len(def.Columns) + newCol := make([]TableCol, n) + copy(newCol, def.Columns) + newDef := def + newDef.Columns = newCol + + for i := 0; i < n; i++ { + target := rand.Intn(n) + newDef.Columns[i], newDef.Columns[target] = newDef.Columns[target], newDef.Columns[i] + newChecksum, err := newDef.Sum32(nil) + require.NoError(t, err) + require.Equal(t, checksum1, newChecksum) + } +} diff --git a/tests/integration_tests/canal_json_storage_basic/run.sh b/tests/integration_tests/canal_json_storage_basic/run.sh index 97e6cb03098..2f3f6c50564 100644 --- a/tests/integration_tests/canal_json_storage_basic/run.sh +++ b/tests/integration_tests/canal_json_storage_basic/run.sh @@ -25,7 +25,6 @@ function run() { run_cdc_cli changefeed create --sink-uri="$SINK_URI" --config=$CUR/conf/changefeed.toml run_sql_file $CUR/data/schema.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} - run_sql_file $CUR/data/schema.sql ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} run_sql_file $CUR/data/data.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} run_storage_consumer $WORK_DIR $SINK_URI $CUR/conf/changefeed.toml "" sleep 8 diff --git a/tests/integration_tests/canal_json_storage_partition_table/data/prepare.sql b/tests/integration_tests/canal_json_storage_partition_table/data/prepare.sql index 54640dfbdc5..70892c9fea3 100644 --- a/tests/integration_tests/canal_json_storage_partition_table/data/prepare.sql +++ b/tests/integration_tests/canal_json_storage_partition_table/data/prepare.sql @@ -21,10 +21,11 @@ alter table t1 drop partition p1; insert into t1 values (7),(8),(9); update t1 set a=a+10 where a=9; -create table t2 (a int primary key); -ALTER TABLE t1 EXCHANGE PARTITION p3 WITH TABLE t2; -insert into t2 values (100),(101),(102),(103),(104),(105); /*these values will be replicated to in downstream t2*/ -insert into t1 values (25),(29); /*these values will be replicated to in downstream t1.p3*/ +/* TODO(CharlesCheung): EXCHANGE PARTITION will be supported in the future */ +-- create table t2 (a int primary key); +-- ALTER TABLE t1 EXCHANGE PARTITION p3 WITH TABLE t2; +-- insert into t2 values (100),(101),(102),(103),(104),(105); /*these values will be replicated to in downstream t2*/ +-- insert into t1 values (25),(29); /*these values will be replicated to in downstream t1.p3*/ /* REORGANIZE is not supported by release v6.5 */ -- ALTER TABLE t1 REORGANIZE PARTITION p0,p2 INTO (PARTITION p0 VALUES LESS THAN (5), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN (21)); diff --git a/tests/integration_tests/canal_json_storage_partition_table/run.sh b/tests/integration_tests/canal_json_storage_partition_table/run.sh index e730c57dcb4..82e97bbf31f 100644 --- a/tests/integration_tests/canal_json_storage_partition_table/run.sh +++ b/tests/integration_tests/canal_json_storage_partition_table/run.sh @@ -20,8 +20,6 @@ function run() { cd $WORK_DIR run_sql "set @@global.tidb_enable_exchange_partition=on" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} - # TODO(CharlesCheung): remove this after schema level ddl is supported by storage sink - run_sql "create database partition_table;" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8300" --logsuffix cdc0 run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8301" --logsuffix cdc1 diff --git a/tests/integration_tests/csv_storage_basic/run.sh b/tests/integration_tests/csv_storage_basic/run.sh index b0af86bfb43..0dc534a894a 100644 --- a/tests/integration_tests/csv_storage_basic/run.sh +++ b/tests/integration_tests/csv_storage_basic/run.sh @@ -24,7 +24,6 @@ function run() { run_cdc_cli changefeed create --sink-uri="$SINK_URI" --config=$CUR/conf/changefeed.toml run_sql_file $CUR/data/schema.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} - run_sql_file $CUR/data/schema.sql ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} run_sql_file $CUR/data/data.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} run_storage_consumer $WORK_DIR $SINK_URI $CUR/conf/changefeed.toml "" sleep 8 diff --git a/tests/integration_tests/csv_storage_multi_tables_ddl/run.sh b/tests/integration_tests/csv_storage_multi_tables_ddl/run.sh index 1a847a66f66..62686f15a20 100755 --- a/tests/integration_tests/csv_storage_multi_tables_ddl/run.sh +++ b/tests/integration_tests/csv_storage_multi_tables_ddl/run.sh @@ -32,7 +32,6 @@ function run() { cf_err1="test-error-1" cf_err2="test-error-2" - run_sql "create database multi_tables_ddl_test" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} SINK_URI1="file://$WORK_DIR/storage_test/$TOPIC_NAME_1?flush-interval=5s&protocol=csv" cdc cli changefeed create -c=$cf_normal --start-ts=$start_ts --sink-uri="$SINK_URI1" --config="$CUR/conf/normal.toml" diff --git a/tests/integration_tests/csv_storage_partition_table/data/prepare.sql b/tests/integration_tests/csv_storage_partition_table/data/prepare.sql index 1e0326f65f1..6f0dff7e0e8 100644 --- a/tests/integration_tests/csv_storage_partition_table/data/prepare.sql +++ b/tests/integration_tests/csv_storage_partition_table/data/prepare.sql @@ -21,10 +21,11 @@ alter table t1 drop partition p1; insert into t1 values (7),(8),(9); -- update t1 set a=a+10 where a=9; -create table t2 (a int primary key); -ALTER TABLE t1 EXCHANGE PARTITION p3 WITH TABLE t2; -insert into t2 values (100),(101),(102),(103),(104),(105); /*these values will be replicated to in downstream t2*/ -insert into t1 values (25),(29); /*these values will be replicated to in downstream t1.p3*/ +/* TODO(CharlesCheung): EXCHANGE PARTITION will be supported in the future */ +-- create table t2 (a int primary key); +-- ALTER TABLE t1 EXCHANGE PARTITION p3 WITH TABLE t2; +-- insert into t2 values (100),(101),(102),(103),(104),(105); /*these values will be replicated to in downstream t2*/ +-- insert into t1 values (25),(29); /*these values will be replicated to in downstream t1.p3*/ /* REORGANIZE is not supported by release v6.5 */ -- ALTER TABLE t1 REORGANIZE PARTITION p0,p2 INTO (PARTITION p0 VALUES LESS THAN (5), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN (21)); diff --git a/tests/integration_tests/csv_storage_partition_table/run.sh b/tests/integration_tests/csv_storage_partition_table/run.sh index a192f50a706..155d986aa74 100644 --- a/tests/integration_tests/csv_storage_partition_table/run.sh +++ b/tests/integration_tests/csv_storage_partition_table/run.sh @@ -20,8 +20,6 @@ function run() { cd $WORK_DIR run_sql "set @@global.tidb_enable_exchange_partition=on" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} - # TODO(CharlesCheung): remove this after schema level ddl is supported by storage sink - run_sql "create database partition_table;" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8300" --logsuffix 0 run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8301" --logsuffix 1