Skip to content

Commit

Permalink
sink/producer(ticdc): migrate test-infra to testify for kafka producer (
Browse files Browse the repository at this point in the history
  • Loading branch information
Rustin170506 authored Mar 23, 2022
1 parent 2e4e740 commit 4103d80
Show file tree
Hide file tree
Showing 2 changed files with 151 additions and 172 deletions.
141 changes: 67 additions & 74 deletions cdc/sink/producer/kafka/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ import (
"fmt"
"net/url"
"strconv"
"testing"
"time"

"github.com/Shopify/sarama"
"github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/util/timeutil"
"github.com/pingcap/tiflow/cdc/sink/codec"
Expand All @@ -30,21 +30,20 @@ import (
"github.com/pingcap/tiflow/pkg/kafka"
"github.com/pingcap/tiflow/pkg/security"
"github.com/pingcap/tiflow/pkg/util"
"github.com/pingcap/tiflow/pkg/util/testleak"
"github.com/stretchr/testify/require"
)

func (s *kafkaSuite) TestNewSaramaConfig(c *check.C) {
defer testleak.AfterTest(c)()
func TestNewSaramaConfig(t *testing.T) {
ctx := context.Background()
config := NewConfig()
config.Version = "invalid"
_, err := NewSaramaConfig(ctx, config)
c.Assert(errors.Cause(err), check.ErrorMatches, "invalid version.*")
require.Regexp(t, "invalid version.*", errors.Cause(err))
ctx = util.SetOwnerInCtx(ctx)
config.Version = "2.6.0"
config.ClientID = "^invalid$"
_, err = NewSaramaConfig(ctx, config)
c.Assert(cerror.ErrKafkaInvalidClientID.Equal(err), check.IsTrue)
require.True(t, cerror.ErrKafkaInvalidClientID.Equal(err))

config.ClientID = "test-kafka-client"
compressionCases := []struct {
Expand All @@ -61,15 +60,15 @@ func (s *kafkaSuite) TestNewSaramaConfig(c *check.C) {
for _, cc := range compressionCases {
config.Compression = cc.algorithm
cfg, err := NewSaramaConfig(ctx, config)
c.Assert(err, check.IsNil)
c.Assert(cfg.Producer.Compression, check.Equals, cc.expected)
require.Nil(t, err)
require.Equal(t, cc.expected, cfg.Producer.Compression)
}

config.Credential = &security.Credential{
CAPath: "/invalid/ca/path",
}
_, err = NewSaramaConfig(ctx, config)
c.Assert(errors.Cause(err), check.ErrorMatches, ".*no such file or directory")
require.Regexp(t, ".*no such file or directory", errors.Cause(err))

saslConfig := NewConfig()
saslConfig.Version = "2.6.0"
Expand All @@ -81,48 +80,45 @@ func (s *kafkaSuite) TestNewSaramaConfig(c *check.C) {
}

cfg, err := NewSaramaConfig(ctx, saslConfig)
c.Assert(err, check.IsNil)
c.Assert(cfg, check.NotNil)
c.Assert(cfg.Net.SASL.User, check.Equals, "user")
c.Assert(cfg.Net.SASL.Password, check.Equals, "password")
c.Assert(cfg.Net.SASL.Mechanism, check.Equals, sarama.SASLMechanism("SCRAM-SHA-256"))
require.Nil(t, err)
require.NotNil(t, cfg)
require.Equal(t, "user", cfg.Net.SASL.User)
require.Equal(t, "password", cfg.Net.SASL.Password)
require.Equal(t, sarama.SASLMechanism("SCRAM-SHA-256"), cfg.Net.SASL.Mechanism)
}

func (s *kafkaSuite) TestConfigTimeouts(c *check.C) {
defer testleak.AfterTest(c)()

func TestConfigTimeouts(t *testing.T) {
cfg := NewConfig()
c.Assert(cfg.DialTimeout, check.Equals, 10*time.Second)
c.Assert(cfg.ReadTimeout, check.Equals, 10*time.Second)
c.Assert(cfg.WriteTimeout, check.Equals, 10*time.Second)
require.Equal(t, 10*time.Second, cfg.DialTimeout)
require.Equal(t, 10*time.Second, cfg.ReadTimeout)
require.Equal(t, 10*time.Second, cfg.WriteTimeout)

saramaConfig, err := NewSaramaConfig(context.Background(), cfg)
c.Assert(err, check.IsNil)
c.Assert(saramaConfig.Net.DialTimeout, check.Equals, cfg.DialTimeout)
c.Assert(saramaConfig.Net.WriteTimeout, check.Equals, cfg.WriteTimeout)
c.Assert(saramaConfig.Net.ReadTimeout, check.Equals, cfg.ReadTimeout)
require.Nil(t, err)
require.Equal(t, cfg.DialTimeout, saramaConfig.Net.DialTimeout)
require.Equal(t, cfg.WriteTimeout, saramaConfig.Net.WriteTimeout)
require.Equal(t, cfg.ReadTimeout, saramaConfig.Net.ReadTimeout)

uri := "kafka://127.0.0.1:9092/kafka-test?dial-timeout=5s&read-timeout=1000ms" +
"&write-timeout=2m"
sinkURI, err := url.Parse(uri)
c.Assert(err, check.IsNil)
require.Nil(t, err)

err = cfg.Apply(sinkURI)
c.Assert(err, check.IsNil)
require.Nil(t, err)

c.Assert(cfg.DialTimeout, check.Equals, 5*time.Second)
c.Assert(cfg.ReadTimeout, check.Equals, 1000*time.Millisecond)
c.Assert(cfg.WriteTimeout, check.Equals, 2*time.Minute)
require.Equal(t, 5*time.Second, cfg.DialTimeout)
require.Equal(t, 1000*time.Millisecond, cfg.ReadTimeout)
require.Equal(t, 2*time.Minute, cfg.WriteTimeout)

saramaConfig, err = NewSaramaConfig(context.Background(), cfg)
c.Assert(err, check.IsNil)
c.Assert(saramaConfig.Net.DialTimeout, check.Equals, 5*time.Second)
c.Assert(saramaConfig.Net.ReadTimeout, check.Equals, 1000*time.Millisecond)
c.Assert(saramaConfig.Net.WriteTimeout, check.Equals, 2*time.Minute)
require.Nil(t, err)
require.Equal(t, 5*time.Second, saramaConfig.Net.DialTimeout)
require.Equal(t, 1000*time.Millisecond, saramaConfig.Net.ReadTimeout)
require.Equal(t, 2*time.Minute, saramaConfig.Net.WriteTimeout)
}

func (s *kafkaSuite) TestCompleteConfigByOpts(c *check.C) {
defer testleak.AfterTest(c)()
func TestCompleteConfigByOpts(t *testing.T) {
cfg := NewConfig()

// Normal config.
Expand All @@ -132,77 +128,74 @@ func (s *kafkaSuite) TestCompleteConfigByOpts(c *check.C) {
maxMessageSize := "4096" // 4kb
uri := fmt.Sprintf(uriTemplate, maxMessageSize)
sinkURI, err := url.Parse(uri)
c.Assert(err, check.IsNil)
require.Nil(t, err)

err = cfg.Apply(sinkURI)
c.Assert(err, check.IsNil)
c.Assert(cfg.PartitionNum, check.Equals, int32(1))
c.Assert(cfg.ReplicationFactor, check.Equals, int16(3))
c.Assert(cfg.Version, check.Equals, "2.6.0")
c.Assert(cfg.MaxMessageBytes, check.Equals, 4096)
require.Nil(t, err)
require.Equal(t, int32(1), cfg.PartitionNum)
require.Equal(t, int16(3), cfg.ReplicationFactor)
require.Equal(t, "2.6.0", cfg.Version)
require.Equal(t, 4096, cfg.MaxMessageBytes)

// multiple kafka broker endpoints
uri = "kafka://127.0.0.1:9092,127.0.0.1:9091,127.0.0.1:9090/kafka-test?"
sinkURI, err = url.Parse(uri)
c.Assert(err, check.IsNil)
require.Nil(t, err)
cfg = NewConfig()
err = cfg.Apply(sinkURI)
c.Assert(err, check.IsNil)
c.Assert(len(cfg.BrokerEndpoints), check.Equals, 3)
require.Nil(t, err)
require.Len(t, cfg.BrokerEndpoints, 3)

// Illegal replication-factor.
uri = "kafka://127.0.0.1:9092/abc?kafka-version=2.6.0&replication-factor=a"
sinkURI, err = url.Parse(uri)
c.Assert(err, check.IsNil)
require.Nil(t, err)
cfg = NewConfig()
err = cfg.Apply(sinkURI)
c.Assert(errors.Cause(err), check.ErrorMatches, ".*invalid syntax.*")
require.Regexp(t, ".*invalid syntax.*", errors.Cause(err))

// Illegal max-message-bytes.
uri = "kafka://127.0.0.1:9092/abc?kafka-version=2.6.0&max-message-bytes=a"
sinkURI, err = url.Parse(uri)
c.Assert(err, check.IsNil)
require.Nil(t, err)
cfg = NewConfig()
err = cfg.Apply(sinkURI)
c.Assert(errors.Cause(err), check.ErrorMatches, ".*invalid syntax.*")
require.Regexp(t, ".*invalid syntax.*", errors.Cause(err))

// Illegal partition-num.
uri = "kafka://127.0.0.1:9092/abc?kafka-version=2.6.0&partition-num=a"
sinkURI, err = url.Parse(uri)
c.Assert(err, check.IsNil)
require.Nil(t, err)
cfg = NewConfig()
err = cfg.Apply(sinkURI)
c.Assert(errors.Cause(err), check.ErrorMatches, ".*invalid syntax.*")
require.Regexp(t, ".*invalid syntax.*", errors.Cause(err))

// Out of range partition-num.
uri = "kafka://127.0.0.1:9092/abc?kafka-version=2.6.0&partition-num=0"
sinkURI, err = url.Parse(uri)
c.Assert(err, check.IsNil)
require.Nil(t, err)
cfg = NewConfig()
err = cfg.Apply(sinkURI)
c.Assert(errors.Cause(err), check.ErrorMatches, ".*invalid partition num.*")
require.Regexp(t, ".*invalid partition num.*", errors.Cause(err))
}

func (s *kafkaSuite) TestSetPartitionNum(c *check.C) {
defer testleak.AfterTest(c)()
func TestSetPartitionNum(t *testing.T) {
cfg := NewConfig()
err := cfg.setPartitionNum(2)
c.Assert(err, check.IsNil)
c.Assert(cfg.PartitionNum, check.Equals, int32(2))
require.Nil(t, err)
require.Equal(t, int32(2), cfg.PartitionNum)

cfg.PartitionNum = 1
err = cfg.setPartitionNum(2)
c.Assert(err, check.IsNil)
c.Assert(cfg.PartitionNum, check.Equals, int32(1))
require.Nil(t, err)
require.Equal(t, int32(1), cfg.PartitionNum)

cfg.PartitionNum = 3
err = cfg.setPartitionNum(2)
c.Assert(cerror.ErrKafkaInvalidPartitionNum.Equal(err), check.IsTrue)
require.True(t, cerror.ErrKafkaInvalidPartitionNum.Equal(err))
}

func (s *kafkaSuite) TestConfigurationCombinations(c *check.C) {
defer testleak.AfterTest(c)()

func TestConfigurationCombinations(t *testing.T) {
NewAdminClientImpl = kafka.NewMockAdminClient
defer func() {
NewAdminClientImpl = kafka.NewSaramaAdminClient
Expand Down Expand Up @@ -392,38 +385,38 @@ func (s *kafkaSuite) TestConfigurationCombinations(c *check.C) {

uri := fmt.Sprintf(a.uriTemplate, a.uriParams...)
sinkURI, err := url.Parse(uri)
c.Assert(err, check.IsNil)
require.Nil(t, err)

baseConfig := NewConfig()
err = baseConfig.Apply(sinkURI)
c.Assert(err, check.IsNil)
require.Nil(t, err)

saramaConfig, err := NewSaramaConfig(context.Background(), baseConfig)
c.Assert(err, check.IsNil)
require.Nil(t, err)

adminClient, err := NewAdminClientImpl([]string{sinkURI.Host}, saramaConfig)
c.Assert(err, check.IsNil)
require.Nil(t, err)

topic, ok := a.uriParams[0].(string)
c.Assert(ok, check.IsTrue)
c.Assert(topic, check.Not(check.Equals), "")
require.True(t, ok)
require.NotEqual(t, "", topic)
err = AdjustConfig(adminClient, baseConfig, saramaConfig, topic)
c.Assert(err, check.IsNil)
require.Nil(t, err)

encoderConfig := codec.NewConfig(config.ProtocolOpen, timeutil.SystemLocation())
err = encoderConfig.Apply(sinkURI, map[string]string{})
c.Assert(err, check.IsNil)
require.Nil(t, err)
encoderConfig.WithMaxMessageBytes(saramaConfig.Producer.MaxMessageBytes)

err = encoderConfig.Validate()
c.Assert(err, check.IsNil)
require.Nil(t, err)

// producer's `MaxMessageBytes` = encoder's `MaxMessageBytes`.
c.Assert(saramaConfig.Producer.MaxMessageBytes, check.Equals, encoderConfig.MaxMessageBytes())
require.Equal(t, encoderConfig.MaxMessageBytes(), saramaConfig.Producer.MaxMessageBytes)

expected, err := strconv.Atoi(a.expectedMaxMessageBytes)
c.Assert(err, check.IsNil)
c.Assert(saramaConfig.Producer.MaxMessageBytes, check.Equals, expected)
require.Nil(t, err)
require.Equal(t, expected, saramaConfig.Producer.MaxMessageBytes)

_ = adminClient.Close()
}
Expand Down
Loading

0 comments on commit 4103d80

Please sign in to comment.