Skip to content

Commit

Permalink
redo(ticdc): fix redo balckhole storage issues (pingcap#10023) (pingc…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Dec 11, 2023
1 parent a07a2ba commit 51f01f3
Show file tree
Hide file tree
Showing 7 changed files with 115 additions and 25 deletions.
4 changes: 4 additions & 0 deletions cdc/model/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,10 @@ func (info *ChangeFeedInfo) RmUnusedFields() {
)
return
}
// blackhole is for testing purpose, no need to remove fields
if sink.IsBlackHoleScheme(uri.Scheme) {
return
}
if !sink.IsMQScheme(uri.Scheme) {
info.rmMQOnlyFields()
} else {
Expand Down
17 changes: 10 additions & 7 deletions cdc/model/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,14 +243,17 @@ func TestVerifyAndComplete(t *testing.T) {
t.Parallel()

info := &ChangeFeedInfo{
SinkURI: "blackhole://",
SinkURI: "mysql://",
StartTs: 417257993615179777,
Config: &config.ReplicaConfig{
MemoryQuota: 1073741824,
CaseSensitive: false,
CheckGCSafePoint: true,
SyncPointInterval: util.AddressOf(time.Minute * 10),
SyncPointRetention: util.AddressOf(time.Hour * 24),
MemoryQuota: 1073741824,
CaseSensitive: false,
CheckGCSafePoint: true,
EnableSyncPoint: util.AddressOf(false),
SyncPointInterval: util.AddressOf(time.Minute * 10),
SyncPointRetention: util.AddressOf(time.Hour * 24),
BDRMode: util.AddressOf(false),
IgnoreIneligibleTable: false,
},
}

Expand All @@ -261,7 +264,7 @@ func TestVerifyAndComplete(t *testing.T) {
require.Nil(t, err)
defaultConfig := config.GetDefaultReplicaConfig()
info2 := &ChangeFeedInfo{
SinkURI: "blackhole://",
SinkURI: "mysql://",
Config: defaultConfig,
}
info2.RmUnusedFields()
Expand Down
4 changes: 4 additions & 0 deletions cdc/redo/meta_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,10 @@ func (m *metaManager) preStart(ctx context.Context) error {
}
// "nfs" and "local" scheme are converted to "file" scheme
redo.FixLocalScheme(uri)
// blackhole scheme is converted to "noop" scheme here, so we can use blackhole for testing
if redo.IsBlackholeStorage(uri.Scheme) {
uri, _ = storage.ParseRawURL("noop://")
}

extStorage, err := redo.InitExternalStorage(ctx, *uri)
if err != nil {
Expand Down
46 changes: 35 additions & 11 deletions pkg/orchestrator/reactor_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,18 @@ func TestChangefeedStateUpdate(t *testing.T) {
Mounter: &config.MounterConfig{WorkerNum: 16},
Scheduler: config.GetDefaultReplicaConfig().Scheduler,
Sink: &config.SinkConfig{
Terminator: putil.AddressOf(config.CRLF),
AdvanceTimeoutInSec: putil.AddressOf(uint(150)),
Terminator: putil.AddressOf(config.CRLF),
AdvanceTimeoutInSec: putil.AddressOf(uint(150)),
CSVConfig: config.GetDefaultReplicaConfig().Sink.CSVConfig,
EncoderConcurrency: config.GetDefaultReplicaConfig().Sink.EncoderConcurrency,
DateSeparator: config.GetDefaultReplicaConfig().Sink.DateSeparator,
EnablePartitionSeparator: config.GetDefaultReplicaConfig().Sink.EnablePartitionSeparator,
EnableKafkaSinkV2: config.GetDefaultReplicaConfig().Sink.EnableKafkaSinkV2,
OnlyOutputUpdatedColumns: config.GetDefaultReplicaConfig().Sink.OnlyOutputUpdatedColumns,
DeleteOnlyOutputHandleKeyColumns: config.GetDefaultReplicaConfig().Sink.DeleteOnlyOutputHandleKeyColumns,
},
Integrity: config.GetDefaultReplicaConfig().Integrity,
Consistent: config.GetDefaultReplicaConfig().Consistent,
Integrity: config.GetDefaultReplicaConfig().Integrity,
ChangefeedErrorStuckDuration: config.
GetDefaultReplicaConfig().ChangefeedErrorStuckDuration,
SQLMode: config.GetDefaultReplicaConfig().SQLMode,
Expand Down Expand Up @@ -171,11 +179,19 @@ func TestChangefeedStateUpdate(t *testing.T) {
Filter: &config.FilterConfig{Rules: []string{"*.*"}},
Mounter: &config.MounterConfig{WorkerNum: 16},
Sink: &config.SinkConfig{
Terminator: putil.AddressOf(config.CRLF),
AdvanceTimeoutInSec: putil.AddressOf(uint(150)),
Terminator: putil.AddressOf(config.CRLF),
AdvanceTimeoutInSec: putil.AddressOf(uint(150)),
CSVConfig: config.GetDefaultReplicaConfig().Sink.CSVConfig,
EncoderConcurrency: config.GetDefaultReplicaConfig().Sink.EncoderConcurrency,
DateSeparator: config.GetDefaultReplicaConfig().Sink.DateSeparator,
EnablePartitionSeparator: config.GetDefaultReplicaConfig().Sink.EnablePartitionSeparator,
EnableKafkaSinkV2: config.GetDefaultReplicaConfig().Sink.EnableKafkaSinkV2,
OnlyOutputUpdatedColumns: config.GetDefaultReplicaConfig().Sink.OnlyOutputUpdatedColumns,
DeleteOnlyOutputHandleKeyColumns: config.GetDefaultReplicaConfig().Sink.DeleteOnlyOutputHandleKeyColumns,
},
Scheduler: config.GetDefaultReplicaConfig().Scheduler,
Integrity: config.GetDefaultReplicaConfig().Integrity,
Scheduler: config.GetDefaultReplicaConfig().Scheduler,
Integrity: config.GetDefaultReplicaConfig().Integrity,
Consistent: config.GetDefaultReplicaConfig().Consistent,
ChangefeedErrorStuckDuration: config.
GetDefaultReplicaConfig().ChangefeedErrorStuckDuration,
SQLMode: config.GetDefaultReplicaConfig().SQLMode,
Expand Down Expand Up @@ -231,11 +247,19 @@ func TestChangefeedStateUpdate(t *testing.T) {
Filter: &config.FilterConfig{Rules: []string{"*.*"}},
Mounter: &config.MounterConfig{WorkerNum: 16},
Sink: &config.SinkConfig{
Terminator: putil.AddressOf(config.CRLF),
AdvanceTimeoutInSec: putil.AddressOf(uint(150)),
Terminator: putil.AddressOf(config.CRLF),
AdvanceTimeoutInSec: putil.AddressOf(uint(150)),
EncoderConcurrency: config.GetDefaultReplicaConfig().Sink.EncoderConcurrency,
CSVConfig: config.GetDefaultReplicaConfig().Sink.CSVConfig,
DateSeparator: config.GetDefaultReplicaConfig().Sink.DateSeparator,
EnablePartitionSeparator: config.GetDefaultReplicaConfig().Sink.EnablePartitionSeparator,
EnableKafkaSinkV2: config.GetDefaultReplicaConfig().Sink.EnableKafkaSinkV2,
OnlyOutputUpdatedColumns: config.GetDefaultReplicaConfig().Sink.OnlyOutputUpdatedColumns,
DeleteOnlyOutputHandleKeyColumns: config.GetDefaultReplicaConfig().Sink.DeleteOnlyOutputHandleKeyColumns,
},
Scheduler: config.GetDefaultReplicaConfig().Scheduler,
Integrity: config.GetDefaultReplicaConfig().Integrity,
Consistent: config.GetDefaultReplicaConfig().Consistent,
Scheduler: config.GetDefaultReplicaConfig().Scheduler,
Integrity: config.GetDefaultReplicaConfig().Integrity,
ChangefeedErrorStuckDuration: config.
GetDefaultReplicaConfig().ChangefeedErrorStuckDuration,
SQLMode: config.GetDefaultReplicaConfig().SQLMode,
Expand Down
5 changes: 5 additions & 0 deletions pkg/sink/sink_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ func IsPulsarScheme(scheme string) bool {
return scheme == PulsarScheme || scheme == PulsarSSLScheme
}

// IsBlackHoleScheme returns true if the scheme belong to blackhole scheme.
func IsBlackHoleScheme(scheme string) bool {
return scheme == BlackHoleScheme
}

// GetScheme returns the scheme of the url.
func GetScheme(url *url.URL) string {
return strings.ToLower(url.Scheme)
Expand Down
62 changes: 56 additions & 6 deletions tests/integration_tests/api_v2/cases.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"time"

"github.com/pingcap/log"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/util"
"go.uber.org/zap"
)

Expand All @@ -31,6 +33,10 @@ var customReplicaConfig = &ReplicaConfig{
ForceReplicate: false,
IgnoreIneligibleTable: false,
CheckGCSafePoint: false,
BDRMode: util.AddressOf(false),
EnableSyncPoint: util.AddressOf(false),
SyncPointInterval: util.AddressOf(JSONDuration{duration: 10 * time.Minute}),
SyncPointRetention: util.AddressOf(JSONDuration{duration: 24 * time.Hour}),
Filter: &FilterConfig{
MySQLReplicationRules: &MySQLReplicationRules{
DoTables: []*Table{{"a", "b"}, {"c", "d"}},
Expand Down Expand Up @@ -65,6 +71,14 @@ var customReplicaConfig = &ReplicaConfig{
},
TxnAtomicity: "table",
Terminator: "a",
CSVConfig: &CSVConfig{
Quote: string(config.DoubleQuoteChar),
Delimiter: config.Comma,
NullString: config.NULL,
},
DateSeparator: "day",
EncoderConcurrency: util.AddressOf(32),
EnablePartitionSeparator: util.AddressOf(true),
},
Scheduler: &ChangefeedSchedulerConfig{
EnableTableAcrossNodes: false,
Expand All @@ -74,21 +88,43 @@ var customReplicaConfig = &ReplicaConfig{
IntegrityCheckLevel: "none",
CorruptionHandleLevel: "warn",
},
Consistent: &ConsistentConfig{
Level: "none",
MaxLogSize: 64,
FlushIntervalInMs: 2000,
MetaFlushIntervalInMs: 200,
Storage: "",
UseFileBackend: false,
EncoderWorkerNum: 31,
FlushWorkerNum: 18,
},
}

// defaultReplicaConfig check if the default values is changed
var defaultReplicaConfig = &ReplicaConfig{
MemoryQuota: 1024 * 1024 * 1024,
CaseSensitive: false,
CheckGCSafePoint: true,
MemoryQuota: 1024 * 1024 * 1024,
CaseSensitive: false,
CheckGCSafePoint: true,
EnableSyncPoint: util.AddressOf(false),
SyncPointInterval: util.AddressOf(JSONDuration{duration: 10 * time.Minute}),
SyncPointRetention: util.AddressOf(JSONDuration{duration: 24 * time.Hour}),
BDRMode: util.AddressOf(false),
Filter: &FilterConfig{
Rules: []string{"*.*"},
},
Mounter: &MounterConfig{
WorkerNum: 16,
},
Sink: &SinkConfig{
Terminator: "\r\n",
CSVConfig: &CSVConfig{
Quote: string(config.DoubleQuoteChar),
Delimiter: config.Comma,
NullString: config.NULL,
},
Terminator: "\r\n",
DateSeparator: "day",
EncoderConcurrency: util.AddressOf(32),
EnablePartitionSeparator: util.AddressOf(true),
},
Scheduler: &ChangefeedSchedulerConfig{
EnableTableAcrossNodes: false,
Expand All @@ -98,6 +134,16 @@ var defaultReplicaConfig = &ReplicaConfig{
IntegrityCheckLevel: "none",
CorruptionHandleLevel: "warn",
},
Consistent: &ConsistentConfig{
Level: "none",
MaxLogSize: 64,
FlushIntervalInMs: 2000,
MetaFlushIntervalInMs: 200,
EncoderWorkerNum: 16,
FlushWorkerNum: 8,
Storage: "",
UseFileBackend: false,
},
}

func testStatus(ctx context.Context, client *CDCRESTClient) error {
Expand Down Expand Up @@ -143,7 +189,9 @@ func testChangefeed(ctx context.Context, client *CDCRESTClient) error {
log.Panic("failed to unmarshal response", zap.String("body", string(resp.body)), zap.Error(err))
}
if !reflect.DeepEqual(cfInfo.Config, defaultReplicaConfig) {
log.Panic("config is not equals", zap.Any("add", defaultReplicaConfig), zap.Any("get", cfInfo.Config))
log.Panic("config is not equals",
zap.Any("add", defaultReplicaConfig),
zap.Any("get", cfInfo.Config))
}

// pause changefeed
Expand Down Expand Up @@ -191,7 +239,9 @@ func testChangefeed(ctx context.Context, client *CDCRESTClient) error {
log.Panic("unmarshal failed", zap.String("body", string(resp.body)), zap.Error(err))
}
if !reflect.DeepEqual(cf.Config, customReplicaConfig) {
log.Panic("config is not equals", zap.Any("update", customReplicaConfig), zap.Any("get", cf.Config))
log.Panic("config is not equals",
zap.Any("update", customReplicaConfig),
zap.Any("get", cf.Config))
}

// list changefeed
Expand Down
2 changes: 1 addition & 1 deletion tests/integration_tests/api_v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ type ConsistentConfig struct {
MaxLogSize int64 `json:"max_log_size"`
FlushIntervalInMs int64 `json:"flush_interval"`
MetaFlushIntervalInMs int64 `json:"meta_flush_interval"`
EncoderWorkerNum int `json:"encoder_worker_num"`
EncoderWorkerNum int `json:"encoding_worker_num"`
FlushWorkerNum int `json:"flush_worker_num"`
Storage string `json:"storage"`
UseFileBackend bool `json:"use_file_backend"`
Expand Down

0 comments on commit 51f01f3

Please sign in to comment.